Chromium Code Reviews
Help | Chromium Project | Sign in
(289)

Side by Side Diff: sync/engine/sync_scheduler_impl.cc

Issue 10917234: sync: make scheduling logic and job ownership more obvious. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: address review Created 1 year, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "sync/engine/sync_scheduler_impl.h" 5 #include "sync/engine/sync_scheduler_impl.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <cstring> 8 #include <cstring>
9 9
10 #include "base/auto_reset.h" 10 #include "base/auto_reset.h"
11 #include "base/bind.h" 11 #include "base/bind.h"
12 #include "base/bind_helpers.h"
12 #include "base/compiler_specific.h" 13 #include "base/compiler_specific.h"
13 #include "base/location.h" 14 #include "base/location.h"
14 #include "base/logging.h" 15 #include "base/logging.h"
15 #include "base/message_loop.h" 16 #include "base/message_loop.h"
16 #include "sync/engine/backoff_delay_provider.h" 17 #include "sync/engine/backoff_delay_provider.h"
17 #include "sync/engine/syncer.h" 18 #include "sync/engine/syncer.h"
18 #include "sync/engine/throttled_data_type_tracker.h" 19 #include "sync/engine/throttled_data_type_tracker.h"
19 #include "sync/protocol/proto_enum_conversions.h" 20 #include "sync/protocol/proto_enum_conversions.h"
20 #include "sync/protocol/sync.pb.h" 21 #include "sync/protocol/sync.pb.h"
21 #include "sync/util/data_type_histogram.h" 22 #include "sync/util/data_type_histogram.h"
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
76 : source(source), 77 : source(source),
77 types_to_download(types_to_download), 78 types_to_download(types_to_download),
78 routing_info(routing_info), 79 routing_info(routing_info),
79 ready_task(ready_task) { 80 ready_task(ready_task) {
80 DCHECK(!ready_task.is_null()); 81 DCHECK(!ready_task.is_null());
81 } 82 }
82 ConfigurationParams::~ConfigurationParams() {} 83 ConfigurationParams::~ConfigurationParams() {}
83 84
84 SyncSchedulerImpl::WaitInterval::WaitInterval() 85 SyncSchedulerImpl::WaitInterval::WaitInterval()
85 : mode(UNKNOWN), 86 : mode(UNKNOWN),
86 had_nudge(false) { 87 had_nudge(false),
87 } 88 pending_configure_job(NULL) {}
89
90 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
91 : mode(mode), had_nudge(false), length(length),
92 pending_configure_job(NULL) {}
88 93
89 SyncSchedulerImpl::WaitInterval::~WaitInterval() {} 94 SyncSchedulerImpl::WaitInterval::~WaitInterval() {}
90 95
91 #define ENUM_CASE(x) case x: return #x; break; 96 #define ENUM_CASE(x) case x: return #x; break;
92 97
93 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { 98 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) {
94 switch (mode) { 99 switch (mode) {
95 ENUM_CASE(UNKNOWN); 100 ENUM_CASE(UNKNOWN);
96 ENUM_CASE(EXPONENTIAL_BACKOFF); 101 ENUM_CASE(EXPONENTIAL_BACKOFF);
97 ENUM_CASE(THROTTLED); 102 ENUM_CASE(THROTTLED);
98 } 103 }
99 NOTREACHED(); 104 NOTREACHED();
100 return ""; 105 return "";
101 } 106 }
102 107
103 SyncSchedulerImpl::SyncSessionJob::SyncSessionJob()
104 : purpose(UNKNOWN),
105 is_canary_job(false) {
106 }
107
108 SyncSchedulerImpl::SyncSessionJob::~SyncSessionJob() {}
109
110 SyncSchedulerImpl::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose,
111 base::TimeTicks start,
112 linked_ptr<sessions::SyncSession> session,
113 bool is_canary_job,
114 const ConfigurationParams& config_params,
115 const tracked_objects::Location& from_here)
116 : purpose(purpose),
117 scheduled_start(start),
118 session(session),
119 is_canary_job(is_canary_job),
120 config_params(config_params),
121 from_here(from_here) {
122 }
123
124 const char* SyncSchedulerImpl::SyncSessionJob::GetPurposeString(
125 SyncSchedulerImpl::SyncSessionJob::SyncSessionJobPurpose purpose) {
126 switch (purpose) {
127 ENUM_CASE(UNKNOWN);
128 ENUM_CASE(POLL);
129 ENUM_CASE(NUDGE);
130 ENUM_CASE(CONFIGURATION);
131 }
132 NOTREACHED();
133 return "";
134 }
135
136 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( 108 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
137 NudgeSource source) { 109 NudgeSource source) {
138 switch (source) { 110 switch (source) {
139 case NUDGE_SOURCE_NOTIFICATION: 111 case NUDGE_SOURCE_NOTIFICATION:
140 return GetUpdatesCallerInfo::NOTIFICATION; 112 return GetUpdatesCallerInfo::NOTIFICATION;
141 case NUDGE_SOURCE_LOCAL: 113 case NUDGE_SOURCE_LOCAL:
142 return GetUpdatesCallerInfo::LOCAL; 114 return GetUpdatesCallerInfo::LOCAL;
143 case NUDGE_SOURCE_CONTINUATION: 115 case NUDGE_SOURCE_CONTINUATION:
144 return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; 116 return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
145 case NUDGE_SOURCE_LOCAL_REFRESH: 117 case NUDGE_SOURCE_LOCAL_REFRESH:
146 return GetUpdatesCallerInfo::DATATYPE_REFRESH; 118 return GetUpdatesCallerInfo::DATATYPE_REFRESH;
147 case NUDGE_SOURCE_UNKNOWN: 119 case NUDGE_SOURCE_UNKNOWN:
148 return GetUpdatesCallerInfo::UNKNOWN; 120 return GetUpdatesCallerInfo::UNKNOWN;
149 default: 121 default:
150 NOTREACHED(); 122 NOTREACHED();
151 return GetUpdatesCallerInfo::UNKNOWN; 123 return GetUpdatesCallerInfo::UNKNOWN;
152 } 124 }
153 } 125 }
154 126
155 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
156 : mode(mode), had_nudge(false), length(length) { }
157
158 // Helper macros to log with the syncer thread name; useful when there 127 // Helper macros to log with the syncer thread name; useful when there
159 // are multiple syncer threads involved. 128 // are multiple syncer threads involved.
160 129
161 #define SLOG(severity) LOG(severity) << name_ << ": " 130 #define SLOG(severity) LOG(severity) << name_ << ": "
162 131
163 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": " 132 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": "
164 133
165 #define SDVLOG_LOC(from_here, verbose_level) \ 134 #define SDVLOG_LOC(from_here, verbose_level) \
166 DVLOG_LOC(from_here, verbose_level) << name_ << ": " 135 DVLOG_LOC(from_here, verbose_level) << name_ << ": "
167 136
(...skipping 30 matching lines...) Expand all
198 syncer_short_poll_interval_seconds_( 167 syncer_short_poll_interval_seconds_(
199 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), 168 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
200 syncer_long_poll_interval_seconds_( 169 syncer_long_poll_interval_seconds_(
201 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), 170 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
202 sessions_commit_delay_( 171 sessions_commit_delay_(
203 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), 172 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)),
204 mode_(NORMAL_MODE), 173 mode_(NORMAL_MODE),
205 // Start with assuming everything is fine with the connection. 174 // Start with assuming everything is fine with the connection.
206 // At the end of the sync cycle we would have the correct status. 175 // At the end of the sync cycle we would have the correct status.
207 connection_code_(HttpResponse::SERVER_CONNECTION_OK), 176 connection_code_(HttpResponse::SERVER_CONNECTION_OK),
177 pending_nudge_(NULL),
208 delay_provider_(delay_provider), 178 delay_provider_(delay_provider),
209 syncer_(syncer), 179 syncer_(syncer),
210 session_context_(context), 180 session_context_(context),
211 no_scheduling_allowed_(false) { 181 no_scheduling_allowed_(false) {
212 DCHECK(sync_loop_); 182 DCHECK(sync_loop_);
213 } 183 }
214 184
215 SyncSchedulerImpl::~SyncSchedulerImpl() { 185 SyncSchedulerImpl::~SyncSchedulerImpl() {
216 DCHECK_EQ(MessageLoop::current(), sync_loop_); 186 DCHECK_EQ(MessageLoop::current(), sync_loop_);
217 StopImpl(base::Closure()); 187 StopImpl(base::Closure());
(...skipping 16 matching lines...) Expand all
234 void SyncSchedulerImpl::OnConnectionStatusChange() { 204 void SyncSchedulerImpl::OnConnectionStatusChange() {
235 if (HttpResponse::CONNECTION_UNAVAILABLE == connection_code_) { 205 if (HttpResponse::CONNECTION_UNAVAILABLE == connection_code_) {
236 // Optimistically assume that the connection is fixed and try 206 // Optimistically assume that the connection is fixed and try
237 // connecting. 207 // connecting.
238 OnServerConnectionErrorFixed(); 208 OnServerConnectionErrorFixed();
239 } 209 }
240 } 210 }
241 211
242 void SyncSchedulerImpl::OnServerConnectionErrorFixed() { 212 void SyncSchedulerImpl::OnServerConnectionErrorFixed() {
243 connection_code_ = HttpResponse::SERVER_CONNECTION_OK; 213 connection_code_ = HttpResponse::SERVER_CONNECTION_OK;
214 // There could be a pending nudge or configuration job in several cases:
215 //
216 // 1. We're in exponential backoff.
217 // 2. We're silenced / throttled.
218 // 3. A nudge was saved previously due to not having a valid auth token.
219 // 4. A nudge was scheduled + saved while in configuration mode.
220 //
221 // In all cases except (2), we want to retry contacting the server. We
222 // call DoCanaryJob to achieve this, and note that nothing -- not even a
223 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that
224 // has the authority to do that is the Unthrottle timer.
225 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode());
226 if (!pending.get())
227 return;
228
244 PostTask(FROM_HERE, "DoCanaryJob", 229 PostTask(FROM_HERE, "DoCanaryJob",
245 base::Bind(&SyncSchedulerImpl::DoCanaryJob, 230 base::Bind(&SyncSchedulerImpl::DoCanaryJob,
246 weak_ptr_factory_.GetWeakPtr())); 231 weak_ptr_factory_.GetWeakPtr(),
247 232 base::Passed(&pending)));
248 } 233 }
249 234
250 void SyncSchedulerImpl::UpdateServerConnectionManagerStatus( 235 void SyncSchedulerImpl::UpdateServerConnectionManagerStatus(
251 HttpResponse::ServerConnectionCode code) { 236 HttpResponse::ServerConnectionCode code) {
252 DCHECK_EQ(MessageLoop::current(), sync_loop_); 237 DCHECK_EQ(MessageLoop::current(), sync_loop_);
253 SDVLOG(2) << "New server connection code: " 238 SDVLOG(2) << "New server connection code: "
254 << HttpResponse::GetServerConnectionCodeString(code); 239 << HttpResponse::GetServerConnectionCodeString(code);
255 240
256 connection_code_ = code; 241 connection_code_ = code;
257 } 242 }
(...skipping 15 matching lines...) Expand all
273 Mode old_mode = mode_; 258 Mode old_mode = mode_;
274 mode_ = mode; 259 mode_ = mode;
275 AdjustPolling(NULL); // Will kick start poll timer if needed. 260 AdjustPolling(NULL); // Will kick start poll timer if needed.
276 261
277 if (old_mode != mode_) { 262 if (old_mode != mode_) {
278 // We just changed our mode. See if there are any pending jobs that we could 263 // We just changed our mode. See if there are any pending jobs that we could
279 // execute in the new mode. 264 // execute in the new mode.
280 if (mode_ == NORMAL_MODE) { 265 if (mode_ == NORMAL_MODE) {
281 // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job 266 // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job
282 // has not yet completed. 267 // has not yet completed.
283 DCHECK(!wait_interval_.get() || 268 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job);
284 !wait_interval_->pending_configure_job.get());
285 } 269 }
286 270
287 DoPendingJobIfPossible(false); 271 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode());
272 if (pending.get()) {
273 // TODO(tim): We should be able to remove this...
274 scoped_ptr<SyncSession> session(CreateSyncSession(
275 pending->session()->source()));
276 // Also the routing info might have been changed since we cached the
277 // pending nudge. Update it by coalescing to the latest.
278 pending->mutable_session()->Coalesce(*(session));
akalin 2012/10/26 06:52:29 just *session
279 SDVLOG(2) << "Executing pending job. Good luck!";
280 DoSyncSessionJob(pending.Pass());
281 }
288 } 282 }
289 } 283 }
290 284
291 void SyncSchedulerImpl::SendInitialSnapshot() { 285 void SyncSchedulerImpl::SendInitialSnapshot() {
292 DCHECK_EQ(MessageLoop::current(), sync_loop_); 286 DCHECK_EQ(MessageLoop::current(), sync_loop_);
293 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this, 287 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this,
294 SyncSourceInfo(), ModelSafeRoutingInfo(), 288 SyncSourceInfo(), ModelSafeRoutingInfo(),
295 std::vector<ModelSafeWorker*>())); 289 std::vector<ModelSafeWorker*>()));
296 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); 290 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED);
297 event.snapshot = dummy->TakeSnapshot(); 291 event.snapshot = dummy->TakeSnapshot();
(...skipping 23 matching lines...) Expand all
321 bool SyncSchedulerImpl::ScheduleConfiguration( 315 bool SyncSchedulerImpl::ScheduleConfiguration(
322 const ConfigurationParams& params) { 316 const ConfigurationParams& params) {
323 DCHECK_EQ(MessageLoop::current(), sync_loop_); 317 DCHECK_EQ(MessageLoop::current(), sync_loop_);
324 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); 318 DCHECK(IsConfigRelatedUpdateSourceValue(params.source));
325 DCHECK_EQ(CONFIGURATION_MODE, mode_); 319 DCHECK_EQ(CONFIGURATION_MODE, mode_);
326 DCHECK(!params.ready_task.is_null()); 320 DCHECK(!params.ready_task.is_null());
327 SDVLOG(2) << "Reconfiguring syncer."; 321 SDVLOG(2) << "Reconfiguring syncer.";
328 322
329 // Only one configuration is allowed at a time. Verify we're not waiting 323 // Only one configuration is allowed at a time. Verify we're not waiting
330 // for a pending configure job. 324 // for a pending configure job.
331 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job.get()); 325 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job);
332 326
333 ModelSafeRoutingInfo restricted_routes; 327 ModelSafeRoutingInfo restricted_routes;
334 BuildModelSafeParams(params.types_to_download, 328 BuildModelSafeParams(params.types_to_download,
335 params.routing_info, 329 params.routing_info,
336 &restricted_routes); 330 &restricted_routes);
337 session_context_->set_routing_info(params.routing_info); 331 session_context_->set_routing_info(params.routing_info);
338 332
339 // Only reconfigure if we have types to download. 333 // Only reconfigure if we have types to download.
340 if (!params.types_to_download.Empty()) { 334 if (!params.types_to_download.Empty()) {
341 DCHECK(!restricted_routes.empty()); 335 DCHECK(!restricted_routes.empty());
342 linked_ptr<SyncSession> session(new SyncSession( 336 scoped_ptr<SyncSession> session(new SyncSession(
343 session_context_, 337 session_context_,
344 this, 338 this,
345 SyncSourceInfo(params.source, 339 SyncSourceInfo(params.source,
346 ModelSafeRoutingInfoToInvalidationMap( 340 ModelSafeRoutingInfoToInvalidationMap(
347 restricted_routes, 341 restricted_routes,
348 std::string())), 342 std::string())),
349 restricted_routes, 343 restricted_routes,
350 session_context_->workers())); 344 session_context_->workers()));
351 SyncSessionJob job(SyncSessionJob::CONFIGURATION, 345 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(
352 TimeTicks::Now(), 346 SyncSessionJob::CONFIGURATION,
353 session, 347 TimeTicks::Now(),
354 false, 348 session.Pass(),
355 params, 349 params,
356 FROM_HERE); 350 FROM_HERE));
357 DoSyncSessionJob(job); 351 bool succeeded = DoSyncSessionJob(job.Pass());
358 352
359 // If we failed, the job would have been saved as the pending configure 353 // If we failed, the job would have been saved as the pending configure
360 // job and a wait interval would have been set. 354 // job and a wait interval would have been set.
361 if (!session->Succeeded()) { 355 if (!succeeded) {
362 DCHECK(wait_interval_.get() && 356 DCHECK(wait_interval_.get() && wait_interval_->pending_configure_job);
363 wait_interval_->pending_configure_job.get());
364 return false; 357 return false;
365 } 358 }
366 } else { 359 } else {
367 SDVLOG(2) << "No change in routing info, calling ready task directly."; 360 SDVLOG(2) << "No change in routing info, calling ready task directly.";
368 params.ready_task.Run(); 361 params.ready_task.Run();
369 } 362 }
370 363
371 return true; 364 return true;
372 } 365 }
373 366
374 SyncSchedulerImpl::JobProcessDecision 367 SyncSchedulerImpl::JobProcessDecision
375 SyncSchedulerImpl::DecideWhileInWaitInterval( 368 SyncSchedulerImpl::DecideWhileInWaitInterval(const SyncSessionJob& job) {
376 const SyncSessionJob& job) {
377 DCHECK_EQ(MessageLoop::current(), sync_loop_); 369 DCHECK_EQ(MessageLoop::current(), sync_loop_);
378 DCHECK(wait_interval_.get()); 370 DCHECK(wait_interval_.get());
379 371
380 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " 372 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode "
381 << WaitInterval::GetModeString(wait_interval_->mode) 373 << WaitInterval::GetModeString(wait_interval_->mode)
382 << (wait_interval_->had_nudge ? " (had nudge)" : "") 374 << (wait_interval_->had_nudge ? " (had nudge)" : "")
383 << (job.is_canary_job ? " (canary)" : ""); 375 << (job.is_canary() ? " (canary)" : "");
384 376
385 if (job.purpose == SyncSessionJob::POLL) 377 if (job.purpose() == SyncSessionJob::POLL)
386 return DROP; 378 return DROP;
387 379
388 DCHECK(job.purpose == SyncSessionJob::NUDGE || 380 // If we save a job while in a WaitInterval, there is a well-defined moment
389 job.purpose == SyncSessionJob::CONFIGURATION); 381 // in time in the future when it makes sense for that SAVE-worthy job to try
382 // running again -- the end of the WaitInterval.
383 DCHECK(job.purpose() == SyncSessionJob::NUDGE ||
384 job.purpose() == SyncSessionJob::CONFIGURATION);
385
386 // If throttled, there's a clock ticking to unthrottle. We want to get
387 // on the same train.
390 if (wait_interval_->mode == WaitInterval::THROTTLED) 388 if (wait_interval_->mode == WaitInterval::THROTTLED)
391 return SAVE; 389 return SAVE;
392 390
393 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); 391 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF);
394 if (job.purpose == SyncSessionJob::NUDGE) { 392 if (job.purpose() == SyncSessionJob::NUDGE) {
395 if (mode_ == CONFIGURATION_MODE) 393 if (mode_ == CONFIGURATION_MODE)
396 return SAVE; 394 return SAVE;
397 395
398 // If we already had one nudge then just drop this nudge. We will retry 396 // If we already had one nudge then just drop this nudge. We will retry
399 // later when the timer runs out. 397 // later when the timer runs out.
400 if (!job.is_canary_job) 398 if (!job.is_canary())
401 return wait_interval_->had_nudge ? DROP : CONTINUE; 399 return wait_interval_->had_nudge ? DROP : CONTINUE;
402 else // We are here because timer ran out. So retry. 400 else // We are here because timer ran out. So retry.
403 return CONTINUE; 401 return CONTINUE;
404 } 402 }
405 return job.is_canary_job ? CONTINUE : SAVE; 403 return job.is_canary() ? CONTINUE : SAVE;
406 } 404 }
407 405
408 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( 406 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob(
409 const SyncSessionJob& job) { 407 const SyncSessionJob& job) {
410 DCHECK_EQ(MessageLoop::current(), sync_loop_); 408 DCHECK_EQ(MessageLoop::current(), sync_loop_);
411 409
412 // See if our type is throttled. 410 // See if our type is throttled.
413 ModelTypeSet throttled_types = 411 ModelTypeSet throttled_types =
414 session_context_->throttled_data_type_tracker()->GetThrottledTypes(); 412 session_context_->throttled_data_type_tracker()->GetThrottledTypes();
415 if (job.purpose == SyncSessionJob::NUDGE && 413 if (job.purpose() == SyncSessionJob::NUDGE &&
416 job.session->source().updates_source == GetUpdatesCallerInfo::LOCAL) { 414 job.session()->source().updates_source == GetUpdatesCallerInfo::LOCAL) {
417 ModelTypeSet requested_types; 415 ModelTypeSet requested_types;
418 for (ModelTypeInvalidationMap::const_iterator i = 416 for (ModelTypeInvalidationMap::const_iterator i =
419 job.session->source().types.begin(); 417 job.session()->source().types.begin();
420 i != job.session->source().types.end(); 418 i != job.session()->source().types.end();
421 ++i) { 419 ++i) {
422 requested_types.Put(i->first); 420 requested_types.Put(i->first);
423 } 421 }
424 422
423 // If all types are throttled, do not CONTINUE. Today, we don't treat
424 // a per-datatype "unthrottle" event as something that should force a
425 // canary job. For this reason, there's no good time to reschedule this job
426 // to run -- we'll lazily wait for an independent event to trigger a sync.
427 // Note that there may already be such an event if we're in a WaitInterval,
428 // so we can retry it then.
425 if (!requested_types.Empty() && throttled_types.HasAll(requested_types)) 429 if (!requested_types.Empty() && throttled_types.HasAll(requested_types))
426 return SAVE; 430 return SAVE;
427 } 431 }
428 432
429 if (wait_interval_.get()) 433 if (wait_interval_.get())
430 return DecideWhileInWaitInterval(job); 434 return DecideWhileInWaitInterval(job);
431 435
432 if (mode_ == CONFIGURATION_MODE) { 436 if (mode_ == CONFIGURATION_MODE) {
433 if (job.purpose == SyncSessionJob::NUDGE) 437 if (job.purpose() == SyncSessionJob::NUDGE)
434 return SAVE; 438 return SAVE; // Running requires a mode switch.
akalin 2012/10/26 06:52:29 two spaces before //
435 else if (job.purpose == SyncSessionJob::CONFIGURATION) 439 else if (job.purpose() == SyncSessionJob::CONFIGURATION)
436 return CONTINUE; 440 return CONTINUE;
437 else 441 else
438 return DROP; 442 return DROP;
439 } 443 }
440 444
441 // We are in normal mode. 445 // We are in normal mode.
442 DCHECK_EQ(mode_, NORMAL_MODE); 446 DCHECK_EQ(mode_, NORMAL_MODE);
443 DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION); 447 DCHECK_NE(job.purpose(), SyncSessionJob::CONFIGURATION);
444 448
445 // Note about some subtle scheduling semantics. 449 // Note about some subtle scheduling semantics.
446 // 450 //
447 // It's possible at this point that |job| is known to be unnecessary, and 451 // It's possible at this point that |job| is known to be unnecessary, and
448 // dropping it would be perfectly safe and correct. Consider 452 // dropping it would be perfectly safe and correct. Consider
449 // 453 //
450 // 1) |job| is a POLL with a |scheduled_start| time that is less than 454 // 1) |job| is a POLL with a |scheduled_start| time that is less than
451 // the time that the last successful all-datatype NUDGE completed. 455 // the time that the last successful all-datatype NUDGE completed.
452 // 456 //
453 // 2) |job| is a NUDGE (for any combination of types) with a 457 // 2) |job| is a NUDGE (for any combination of types) with a
(...skipping 22 matching lines...) Expand all
476 // * It's not strictly "impossible", but it would be reentrant and hence 480 // * It's not strictly "impossible", but it would be reentrant and hence
477 // illegal. e.g. scheduling a job and re-entering the SyncScheduler is NOT a 481 // illegal. e.g. scheduling a job and re-entering the SyncScheduler is NOT a
478 // legal side effect of any of the work being done as part of a sync cycle. 482 // legal side effect of any of the work being done as part of a sync cycle.
479 // See |no_scheduling_allowed_| for details. 483 // See |no_scheduling_allowed_| for details.
480 484
481 // Decision now rests on state of auth tokens. 485 // Decision now rests on state of auth tokens.
482 if (!session_context_->connection_manager()->HasInvalidAuthToken()) 486 if (!session_context_->connection_manager()->HasInvalidAuthToken())
483 return CONTINUE; 487 return CONTINUE;
484 488
485 SDVLOG(2) << "No valid auth token. Using that to decide on job."; 489 SDVLOG(2) << "No valid auth token. Using that to decide on job.";
486 return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP; 490 // Running the job would require updated auth, so we can't honour
491 // job.scheduled_start().
492 return job.purpose() == SyncSessionJob::NUDGE ? SAVE : DROP;
487 } 493 }
488 494
489 void SyncSchedulerImpl::InitOrCoalescePendingJob(const SyncSessionJob& job) { 495 void SyncSchedulerImpl::HandleSaveJobDecision(scoped_ptr<SyncSessionJob> job) {
490 DCHECK_EQ(MessageLoop::current(), sync_loop_); 496 DCHECK_EQ(DecideOnJob(*job.get()), SAVE);
akalin 2012/10/26 06:52:29 just *job
491 DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); 497 const bool is_nudge = job->purpose() == SyncSessionJob::NUDGE;
492 if (pending_nudge_.get() == NULL) { 498 if (is_nudge && pending_nudge_) {
493 SDVLOG(2) << "Creating a pending nudge job"; 499 SDVLOG(2) << "Coalescing a pending nudge";
494 SyncSession* s = job.session.get(); 500 // TODO(tim): This basically means we never use the more-careful coalescing
495 501 // logic in ScheduleNudgeImpl that takes the min of the two nudge start
496 // Get a fresh session with similar configuration as before (resets 502 // times, because we're calling this function first. Pull this out
497 // StatusController). 503 // into a function to coalesce + set start times and reuse.
498 scoped_ptr<SyncSession> session(new SyncSession(s->context(), 504 pending_nudge_->mutable_session()->Coalesce(*(job->session()));
499 s->delegate(), s->source(), s->routing_info(), s->workers()));
500
501 SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start,
502 make_linked_ptr(session.release()), false,
503 ConfigurationParams(), job.from_here);
504 pending_nudge_.reset(new SyncSessionJob(new_job));
505 return; 505 return;
506 } 506 }
507 507
508 SDVLOG(2) << "Coalescing a pending nudge"; 508 scoped_ptr<SyncSessionJob> job_to_save = job->CloneAndAbandon();
509 pending_nudge_->session->Coalesce(*(job.session.get())); 509 if (wait_interval_.get() && !wait_interval_->pending_configure_job) {
510 pending_nudge_->scheduled_start = job.scheduled_start; 510 // This job should be made the new canary.
511 if (is_nudge) {
512 pending_nudge_ = job_to_save.get();
513 } else {
514 SDVLOG(2) << "Saving a configuration job";
515 DCHECK_EQ(job->purpose(), SyncSessionJob::CONFIGURATION);
516 DCHECK(!wait_interval_->pending_configure_job);
517 DCHECK_EQ(mode_, CONFIGURATION_MODE);
518 DCHECK(!job->config_params().ready_task.is_null());
519 // The only nudge that could exist is a scheduled canary nudge.
520 DCHECK(!unscheduled_nudge_storage_.get());
521 if (pending_nudge_) {
522 // Pre-empt the nudge canary and abandon the old nudge (owned by task).
523 unscheduled_nudge_storage_ = pending_nudge_->CloneAndAbandon();
524 pending_nudge_ = unscheduled_nudge_storage_.get();
525 }
526 wait_interval_->pending_configure_job = job_to_save.get();
527 }
528 TimeDelta length =
529 wait_interval_->timer.desired_run_time() - TimeTicks::Now();
530 wait_interval_->length = length < TimeDelta::FromSeconds(0) ?
531 TimeDelta::FromSeconds(0) : length;
532 RestartWaiting(job_to_save.Pass());
533 return;
534 }
511 535
512 // Unfortunately the nudge location cannot be modified. So it stores the 536 // Note that today there are no cases where we SAVE a CONFIGURATION job
513 // location of the first caller. 537 // when we're not in a WaitInterval. See bug 147736.
514 } 538 DCHECK(is_nudge);
515 539 // There may or may not be a pending_configure_job. Either way this nudge
516 bool SyncSchedulerImpl::ShouldRunJob(const SyncSessionJob& job) { 540 // is unschedulable.
517 DCHECK_EQ(MessageLoop::current(), sync_loop_); 541 pending_nudge_ = job_to_save.get();
518 DCHECK(started_); 542 unscheduled_nudge_storage_ = job_to_save.Pass();
519
520 JobProcessDecision decision = DecideOnJob(job);
521 SDVLOG(2) << "Should run "
522 << SyncSessionJob::GetPurposeString(job.purpose)
523 << " job in mode " << GetModeString(mode_)
524 << ": " << GetDecisionString(decision);
525 if (decision != SAVE)
526 return decision == CONTINUE;
527
528 DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose ==
529 SyncSessionJob::CONFIGURATION);
530
531 SaveJob(job);
532 return false;
533 }
534
535 void SyncSchedulerImpl::SaveJob(const SyncSessionJob& job) {
536 DCHECK_EQ(MessageLoop::current(), sync_loop_);
537 if (job.purpose == SyncSessionJob::NUDGE) {
538 SDVLOG(2) << "Saving a nudge job";
539 InitOrCoalescePendingJob(job);
540 } else if (job.purpose == SyncSessionJob::CONFIGURATION){
541 SDVLOG(2) << "Saving a configuration job";
542 DCHECK(wait_interval_.get());
543 DCHECK(mode_ == CONFIGURATION_MODE);
544
545 // Config params should always get set.
546 DCHECK(!job.config_params.ready_task.is_null());
547 SyncSession* old = job.session.get();
548 SyncSession* s(new SyncSession(session_context_, this, old->source(),
549 old->routing_info(), old->workers()));
550 SyncSessionJob new_job(job.purpose,
551 TimeTicks::Now(),
552 make_linked_ptr(s),
553 false,
554 job.config_params,
555 job.from_here);
556 wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job));
557 } // drop the rest.
558 // TODO(sync): Is it okay to drop the rest? It's weird that
559 // SaveJob() only does what it says sometimes. (See
560 // http://crbug.com/90868.)
561 } 543 }
562 544
563 // Functor for std::find_if to search by ModelSafeGroup. 545 // Functor for std::find_if to search by ModelSafeGroup.
564 struct ModelSafeWorkerGroupIs { 546 struct ModelSafeWorkerGroupIs {
565 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} 547 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {}
566 bool operator()(ModelSafeWorker* w) { 548 bool operator()(ModelSafeWorker* w) {
567 return group == w->GetModelSafeGroup(); 549 return group == w->GetModelSafeGroup();
568 } 550 }
569 ModelSafeGroup group; 551 ModelSafeGroup group;
570 }; 552 };
571 553
572 void SyncSchedulerImpl::ScheduleNudgeAsync( 554 void SyncSchedulerImpl::ScheduleNudgeAsync(
573 const TimeDelta& delay, 555 const TimeDelta& desired_delay,
574 NudgeSource source, ModelTypeSet types, 556 NudgeSource source, ModelTypeSet types,
575 const tracked_objects::Location& nudge_location) { 557 const tracked_objects::Location& nudge_location) {
576 DCHECK_EQ(MessageLoop::current(), sync_loop_); 558 DCHECK_EQ(MessageLoop::current(), sync_loop_);
577 SDVLOG_LOC(nudge_location, 2) 559 SDVLOG_LOC(nudge_location, 2)
578 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " 560 << "Nudge scheduled with delay "
561 << desired_delay.InMilliseconds() << " ms, "
579 << "source " << GetNudgeSourceString(source) << ", " 562 << "source " << GetNudgeSourceString(source) << ", "
580 << "types " << ModelTypeSetToString(types); 563 << "types " << ModelTypeSetToString(types);
581 564
582 ModelTypeInvalidationMap invalidation_map = 565 ModelTypeInvalidationMap invalidation_map =
583 ModelTypeSetToInvalidationMap(types, std::string()); 566 ModelTypeSetToInvalidationMap(types, std::string());
584 SyncSchedulerImpl::ScheduleNudgeImpl(delay, 567 SyncSchedulerImpl::ScheduleNudgeImpl(desired_delay,
585 GetUpdatesFromNudgeSource(source), 568 GetUpdatesFromNudgeSource(source),
586 invalidation_map, 569 invalidation_map,
587 false,
588 nudge_location); 570 nudge_location);
589 } 571 }
590 572
591 void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync( 573 void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync(
592 const TimeDelta& delay, 574 const TimeDelta& desired_delay,
593 NudgeSource source, const ModelTypeInvalidationMap& invalidation_map, 575 NudgeSource source, const ModelTypeInvalidationMap& invalidation_map,
594 const tracked_objects::Location& nudge_location) { 576 const tracked_objects::Location& nudge_location) {
595 DCHECK_EQ(MessageLoop::current(), sync_loop_); 577 DCHECK_EQ(MessageLoop::current(), sync_loop_);
596 SDVLOG_LOC(nudge_location, 2) 578 SDVLOG_LOC(nudge_location, 2)
597 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " 579 << "Nudge scheduled with delay "
580 << desired_delay.InMilliseconds() << " ms, "
598 << "source " << GetNudgeSourceString(source) << ", " 581 << "source " << GetNudgeSourceString(source) << ", "
599 << "payloads " 582 << "payloads "
600 << ModelTypeInvalidationMapToString(invalidation_map); 583 << ModelTypeInvalidationMapToString(invalidation_map);
601 584
602 SyncSchedulerImpl::ScheduleNudgeImpl(delay, 585 SyncSchedulerImpl::ScheduleNudgeImpl(desired_delay,
603 GetUpdatesFromNudgeSource(source), 586 GetUpdatesFromNudgeSource(source),
604 invalidation_map, 587 invalidation_map,
605 false,
606 nudge_location); 588 nudge_location);
607 } 589 }
608 590
609 void SyncSchedulerImpl::ScheduleNudgeImpl( 591 void SyncSchedulerImpl::ScheduleNudgeImpl(
610 const TimeDelta& delay, 592 const TimeDelta& delay,
611 GetUpdatesCallerInfo::GetUpdatesSource source, 593 GetUpdatesCallerInfo::GetUpdatesSource source,
612 const ModelTypeInvalidationMap& invalidation_map, 594 const ModelTypeInvalidationMap& invalidation_map,
613 bool is_canary_job, const tracked_objects::Location& nudge_location) { 595 const tracked_objects::Location& nudge_location) {
614 DCHECK_EQ(MessageLoop::current(), sync_loop_); 596 DCHECK_EQ(MessageLoop::current(), sync_loop_);
615 DCHECK(!invalidation_map.empty()) << "Nudge scheduled for no types!"; 597 DCHECK(!invalidation_map.empty()) << "Nudge scheduled for no types!";
616 598
617 SDVLOG_LOC(nudge_location, 2) 599 SDVLOG_LOC(nudge_location, 2)
618 << "In ScheduleNudgeImpl with delay " 600 << "In ScheduleNudgeImpl with delay "
619 << delay.InMilliseconds() << " ms, " 601 << delay.InMilliseconds() << " ms, "
620 << "source " << GetUpdatesSourceString(source) << ", " 602 << "source " << GetUpdatesSourceString(source) << ", "
621 << "payloads " 603 << "payloads "
622 << ModelTypeInvalidationMapToString(invalidation_map) 604 << ModelTypeInvalidationMapToString(invalidation_map);
623 << (is_canary_job ? " (canary)" : "");
624 605
625 SyncSourceInfo info(source, invalidation_map); 606 SyncSourceInfo info(source, invalidation_map);
626 UpdateNudgeTimeRecords(info); 607 UpdateNudgeTimeRecords(info);
627 608
628 SyncSession* session(CreateSyncSession(info)); 609 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(
629 SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, 610 SyncSessionJob::NUDGE,
630 make_linked_ptr(session), is_canary_job, 611 TimeTicks::Now() + delay,
631 ConfigurationParams(), nudge_location); 612 CreateSyncSession(info).Pass(),
613 ConfigurationParams(),
614 nudge_location));
632 615
633 session = NULL; 616 JobProcessDecision decision = DecideOnJob(*job.get());
akalin 2012/10/26 06:52:29 just *job
634 if (!ShouldRunJob(job)) 617 SDVLOG(2) << "Should run "
618 << SyncSessionJob::GetPurposeString(job->purpose())
619 << " job " << job->session()
620 << " in mode " << GetModeString(mode_)
621 << ": " << GetDecisionString(decision);
622 if (decision != CONTINUE) {
623 // End of the line, though we may save the job for later.
624 if (decision == SAVE) {
625 HandleSaveJobDecision(job.Pass());
626 } else {
627 DCHECK_EQ(decision, DROP);
628 }
635 return; 629 return;
630 }
636 631
637 if (pending_nudge_.get()) { 632 if (pending_nudge_) {
638 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) {
639 SDVLOG(2) << "Dropping the nudge because we are in backoff";
640 return;
641 }
642
643 SDVLOG(2) << "Coalescing pending nudge";
644 pending_nudge_->session->Coalesce(*(job.session.get()));
645
646 SDVLOG(2) << "Rescheduling pending nudge"; 633 SDVLOG(2) << "Rescheduling pending nudge";
647 SyncSession* s = pending_nudge_->session.get(); 634 pending_nudge_->mutable_session()->Coalesce(*(job->session()));
648 job.session.reset(new SyncSession(s->context(), s->delegate(), 635 // Choose the start time as the earliest of the 2. Note that this means
649 s->source(), s->routing_info(), s->workers())); 636 // if a nudge arrives with delay (e.g. kDefaultSessionsCommitDelaySeconds)
650 637 // but a nudge is already scheduled to go out, we'll send the (tab) commit
651 // Choose the start time as the earliest of the 2. 638 // without waiting.
652 job.scheduled_start = std::min(job.scheduled_start, 639 pending_nudge_->set_scheduled_start(
653 pending_nudge_->scheduled_start); 640 std::min(job->scheduled_start(), pending_nudge_->scheduled_start()));
654 pending_nudge_.reset(); 641 // Abandon the old task by cloning and replacing the session.
642 // It's possible that by "rescheduling" we're actually taking a job that
643 // was previously unscheduled and giving it wings, so take care to reset
644 // unscheduled nudge storage.
645 job = pending_nudge_->CloneAndAbandon();
646 unscheduled_nudge_storage_.reset();
647 pending_nudge_ = NULL;
655 } 648 }
656 649
657 // TODO(zea): Consider adding separate throttling/backoff for datatype 650 // TODO(zea): Consider adding separate throttling/backoff for datatype
658 // refresh requests. 651 // refresh requests.
659 ScheduleSyncSessionJob(job); 652 ScheduleSyncSessionJob(job.Pass());
660 } 653 }
661 654
662 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { 655 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) {
663 switch (mode) { 656 switch (mode) {
664 ENUM_CASE(CONFIGURATION_MODE); 657 ENUM_CASE(CONFIGURATION_MODE);
665 ENUM_CASE(NORMAL_MODE); 658 ENUM_CASE(NORMAL_MODE);
666 } 659 }
667 return ""; 660 return "";
668 } 661 }
669 662
670 const char* SyncSchedulerImpl::GetDecisionString( 663 const char* SyncSchedulerImpl::GetDecisionString(
671 SyncSchedulerImpl::JobProcessDecision mode) { 664 SyncSchedulerImpl::JobProcessDecision mode) {
672 switch (mode) { 665 switch (mode) {
673 ENUM_CASE(CONTINUE); 666 ENUM_CASE(CONTINUE);
674 ENUM_CASE(SAVE); 667 ENUM_CASE(SAVE);
675 ENUM_CASE(DROP); 668 ENUM_CASE(DROP);
676 } 669 }
677 return ""; 670 return "";
678 } 671 }
679 672
680 // static
681 void SyncSchedulerImpl::SetSyncerStepsForPurpose(
682 SyncSessionJob::SyncSessionJobPurpose purpose,
683 SyncerStep* start,
684 SyncerStep* end) {
685 switch (purpose) {
686 case SyncSessionJob::CONFIGURATION:
687 *start = DOWNLOAD_UPDATES;
688 *end = APPLY_UPDATES;
689 return;
690 case SyncSessionJob::NUDGE:
691 case SyncSessionJob::POLL:
692 *start = SYNCER_BEGIN;
693 *end = SYNCER_END;
694 return;
695 default:
696 NOTREACHED();
697 *start = SYNCER_END;
698 *end = SYNCER_END;
699 return;
700 }
701 }
702
703 void SyncSchedulerImpl::PostTask( 673 void SyncSchedulerImpl::PostTask(
704 const tracked_objects::Location& from_here, 674 const tracked_objects::Location& from_here,
705 const char* name, const base::Closure& task) { 675 const char* name, const base::Closure& task) {
706 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task"; 676 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task";
707 DCHECK_EQ(MessageLoop::current(), sync_loop_); 677 DCHECK_EQ(MessageLoop::current(), sync_loop_);
708 if (!started_) { 678 if (!started_) {
709 SDVLOG(1) << "Not posting task as scheduler is stopped."; 679 SDVLOG(1) << "Not posting task as scheduler is stopped.";
710 return; 680 return;
711 } 681 }
712 sync_loop_->PostTask(from_here, task); 682 sync_loop_->PostTask(from_here, task);
713 } 683 }
714 684
715 void SyncSchedulerImpl::PostDelayedTask( 685 void SyncSchedulerImpl::PostDelayedTask(
716 const tracked_objects::Location& from_here, 686 const tracked_objects::Location& from_here,
717 const char* name, const base::Closure& task, base::TimeDelta delay) { 687 const char* name, const base::Closure& task, base::TimeDelta delay) {
718 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with " 688 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with "
719 << delay.InMilliseconds() << " ms delay"; 689 << delay.InMilliseconds() << " ms delay";
720 DCHECK_EQ(MessageLoop::current(), sync_loop_); 690 DCHECK_EQ(MessageLoop::current(), sync_loop_);
721 if (!started_) { 691 if (!started_) {
722 SDVLOG(1) << "Not posting task as scheduler is stopped."; 692 SDVLOG(1) << "Not posting task as scheduler is stopped.";
723 return; 693 return;
724 } 694 }
725 sync_loop_->PostDelayedTask(from_here, task, delay); 695 sync_loop_->PostDelayedTask(from_here, task, delay);
726 } 696 }
727 697
728 void SyncSchedulerImpl::ScheduleSyncSessionJob(const SyncSessionJob& job) { 698 void SyncSchedulerImpl::ScheduleSyncSessionJob(
699 scoped_ptr<SyncSessionJob> job) {
729 DCHECK_EQ(MessageLoop::current(), sync_loop_); 700 DCHECK_EQ(MessageLoop::current(), sync_loop_);
730 if (no_scheduling_allowed_) { 701 if (no_scheduling_allowed_) {
731 NOTREACHED() << "Illegal to schedule job while session in progress."; 702 NOTREACHED() << "Illegal to schedule job while session in progress.";
732 return; 703 return;
733 } 704 }
734 705
735 TimeDelta delay = job.scheduled_start - TimeTicks::Now(); 706 TimeDelta delay = job->scheduled_start() - TimeTicks::Now();
707 tracked_objects::Location loc(job->from_location());
736 if (delay < TimeDelta::FromMilliseconds(0)) 708 if (delay < TimeDelta::FromMilliseconds(0))
737 delay = TimeDelta::FromMilliseconds(0); 709 delay = TimeDelta::FromMilliseconds(0);
738 SDVLOG_LOC(job.from_here, 2) 710 SDVLOG_LOC(loc, 2)
739 << "In ScheduleSyncSessionJob with " 711 << "In ScheduleSyncSessionJob with "
740 << SyncSessionJob::GetPurposeString(job.purpose) 712 << SyncSessionJob::GetPurposeString(job->purpose())
741 << " job and " << delay.InMilliseconds() << " ms delay"; 713 << " job and " << delay.InMilliseconds() << " ms delay";
742 714
743 DCHECK(job.purpose == SyncSessionJob::NUDGE || 715 DCHECK(job->purpose() == SyncSessionJob::NUDGE ||
744 job.purpose == SyncSessionJob::POLL); 716 job->purpose() == SyncSessionJob::POLL);
745 if (job.purpose == SyncSessionJob::NUDGE) { 717 if (job->purpose() == SyncSessionJob::NUDGE) {
746 SDVLOG_LOC(job.from_here, 2) << "Resetting pending_nudge"; 718 SDVLOG_LOC(loc, 2) << "Resetting pending_nudge to ";
747 DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == 719 DCHECK(!pending_nudge_ || pending_nudge_->session() ==
748 job.session); 720 job->session());
749 pending_nudge_.reset(new SyncSessionJob(job)); 721 pending_nudge_ = job.get();
750 } 722 }
751 PostDelayedTask(job.from_here, "DoSyncSessionJob", 723
752 base::Bind(&SyncSchedulerImpl::DoSyncSessionJob, 724 PostDelayedTask(loc, "DoSyncSessionJob",
753 weak_ptr_factory_.GetWeakPtr(), 725 base::Bind(base::IgnoreResult(&SyncSchedulerImpl::DoSyncSessionJob),
754 job), 726 weak_ptr_factory_.GetWeakPtr(),
755 delay); 727 base::Passed(&job)),
728 delay);
756 } 729 }
757 730
758 void SyncSchedulerImpl::DoSyncSessionJob(const SyncSessionJob& job) { 731 bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job) {
759 DCHECK_EQ(MessageLoop::current(), sync_loop_); 732 DCHECK_EQ(MessageLoop::current(), sync_loop_);
733 if (job->purpose() == SyncSessionJob::NUDGE) {
734 if (pending_nudge_ == NULL ||
735 pending_nudge_->session() != job->session()) {
736 // |job| is abandoned.
737 SDVLOG(2) << "Dropping a nudge in "
738 << "DoSyncSessionJob because another nudge was scheduled";
739 return false;
740 }
741 pending_nudge_ = NULL;
742
743 // Rebase the session with the latest model safe table and use it to purge
744 // and update any disabled or modified entries in the job.
745 job->mutable_session()->RebaseRoutingInfoWithLatest(
746 session_context_->routing_info(), session_context_->workers());
747 }
760 748
761 AutoReset<bool> protector(&no_scheduling_allowed_, true); 749 AutoReset<bool> protector(&no_scheduling_allowed_, true);
762 if (!ShouldRunJob(job)) { 750 JobProcessDecision decision = DecideOnJob(*job.get());
akalin 2012/10/26 06:52:29 *job (here and elsewhere)
763 SLOG(WARNING) 751 SDVLOG(2) << "Should run "
764 << "Not executing " 752 << SyncSessionJob::GetPurposeString(job->purpose())
765 << SyncSessionJob::GetPurposeString(job.purpose) << " job from " 753 << " job " << job->session()
766 << GetUpdatesSourceString(job.session->source().updates_source); 754 << " in mode " << GetModeString(mode_)
767 return; 755 << " with source " << job->session()->source().updates_source
756 << ": " << GetDecisionString(decision);
757 if (decision != CONTINUE) {
758 if (decision == SAVE) {
759 HandleSaveJobDecision(job.Pass());
760 } else {
761 DCHECK_EQ(decision, DROP);
762 }
763 return false;
768 } 764 }
769 765
770 if (job.purpose == SyncSessionJob::NUDGE) {
771 if (pending_nudge_.get() == NULL ||
772 pending_nudge_->session != job.session) {
773 SDVLOG(2) << "Dropping a nudge in "
774 << "DoSyncSessionJob because another nudge was scheduled";
775 return; // Another nudge must have been scheduled in in the meantime.
776 }
777 pending_nudge_.reset();
778
779 // Create the session with the latest model safe table and use it to purge
780 // and update any disabled or modified entries in the job.
781 scoped_ptr<SyncSession> session(CreateSyncSession(job.session->source()));
782
783 job.session->RebaseRoutingInfoWithLatest(*session);
784 }
785 SDVLOG(2) << "DoSyncSessionJob with " 766 SDVLOG(2) << "DoSyncSessionJob with "
786 << SyncSessionJob::GetPurposeString(job.purpose) << " job"; 767 << SyncSessionJob::GetPurposeString(job->purpose()) << " job";
787
788 SyncerStep begin(SYNCER_END);
789 SyncerStep end(SYNCER_END);
790 SetSyncerStepsForPurpose(job.purpose, &begin, &end);
791 768
792 bool has_more_to_sync = true; 769 bool has_more_to_sync = true;
793 while (ShouldRunJob(job) && has_more_to_sync) { 770 bool premature_exit = false;
771 while (DecideOnJob(*job.get()) == CONTINUE && has_more_to_sync) {
794 SDVLOG(2) << "Calling SyncShare."; 772 SDVLOG(2) << "Calling SyncShare.";
795 // Synchronously perform the sync session from this thread. 773 // Synchronously perform the sync session from this thread.
796 syncer_->SyncShare(job.session.get(), begin, end); 774 premature_exit = !syncer_->SyncShare(job->mutable_session(),
797 has_more_to_sync = job.session->HasMoreToSync(); 775 job->start_step(),
776 job->end_step());
777
778 has_more_to_sync = job->session()->HasMoreToSync();
798 if (has_more_to_sync) 779 if (has_more_to_sync)
799 job.session->PrepareForAnotherSyncCycle(); 780 job->mutable_session()->PrepareForAnotherSyncCycle();
800 } 781 }
801 SDVLOG(2) << "Done SyncShare looping."; 782 SDVLOG(2) << "Done SyncShare looping.";
802 783
803 FinishSyncSessionJob(job); 784 return FinishSyncSessionJob(job.Pass(), premature_exit);
804 } 785 }
805 786
806 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { 787 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) {
807 DCHECK_EQ(MessageLoop::current(), sync_loop_); 788 DCHECK_EQ(MessageLoop::current(), sync_loop_);
808 789
809 // We are interested in recording time between local nudges for datatypes. 790 // We are interested in recording time between local nudges for datatypes.
810 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well. 791 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well.
811 if (info.updates_source != GetUpdatesCallerInfo::LOCAL) 792 if (info.updates_source != GetUpdatesCallerInfo::LOCAL)
812 return; 793 return;
813 794
814 base::TimeTicks now = TimeTicks::Now(); 795 base::TimeTicks now = TimeTicks::Now();
815 // Update timing information for how often datatypes are triggering nudges. 796 // Update timing information for how often datatypes are triggering nudges.
816 for (ModelTypeInvalidationMap::const_iterator iter = info.types.begin(); 797 for (ModelTypeInvalidationMap::const_iterator iter = info.types.begin();
817 iter != info.types.end(); 798 iter != info.types.end();
818 ++iter) { 799 ++iter) {
819 base::TimeTicks previous = last_local_nudges_by_model_type_[iter->first]; 800 base::TimeTicks previous = last_local_nudges_by_model_type_[iter->first];
820 last_local_nudges_by_model_type_[iter->first] = now; 801 last_local_nudges_by_model_type_[iter->first] = now;
821 if (previous.is_null()) 802 if (previous.is_null())
822 continue; 803 continue;
823 804
824 #define PER_DATA_TYPE_MACRO(type_str) \ 805 #define PER_DATA_TYPE_MACRO(type_str) \
825 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); 806 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous);
826 SYNC_DATA_TYPE_HISTOGRAM(iter->first); 807 SYNC_DATA_TYPE_HISTOGRAM(iter->first);
akalin 2012/10/26 06:52:29 indent?
827 #undef PER_DATA_TYPE_MACRO 808 #undef PER_DATA_TYPE_MACRO
828 } 809 }
829 } 810 }
830 811
831 void SyncSchedulerImpl::FinishSyncSessionJob(const SyncSessionJob& job) { 812 bool SyncSchedulerImpl::FinishSyncSessionJob(scoped_ptr<SyncSessionJob> job,
813 bool exited_prematurely) {
832 DCHECK_EQ(MessageLoop::current(), sync_loop_); 814 DCHECK_EQ(MessageLoop::current(), sync_loop_);
833 // Now update the status of the connection from SCM. We need this to decide 815 // Now update the status of the connection from SCM. We need this to decide
834 // whether we need to save/run future jobs. The notifications from SCM are not 816 // whether we need to save/run future jobs. The notifications from SCM are
835 // reliable. 817 // not reliable.
836 // 818 //
837 // TODO(rlarocque): crbug.com/110954 819 // TODO(rlarocque): crbug.com/110954
838 // We should get rid of the notifications and it is probably not needed to 820 // We should get rid of the notifications and it is probably not needed to
839 // maintain this status variable in 2 places. We should query it directly from 821 // maintain this status variable in 2 places. We should query it directly
840 // SCM when needed. 822 // from SCM when needed.
841 ServerConnectionManager* scm = session_context_->connection_manager(); 823 ServerConnectionManager* scm = session_context_->connection_manager();
842 UpdateServerConnectionManagerStatus(scm->server_status()); 824 UpdateServerConnectionManagerStatus(scm->server_status());
843 825
844 if (IsSyncingCurrentlySilenced()) { 826 // Let job know that we're through syncing (calling SyncShare) at this point.
845 SDVLOG(2) << "We are currently throttled; not scheduling the next sync."; 827 bool succeeded = false;
846 // TODO(sync): Investigate whether we need to check job.purpose 828 {
847 // here; see DCHECKs in SaveJob(). (See http://crbug.com/90868.)
848 SaveJob(job);
849 return; // Nothing to do.
850 } else if (job.session->Succeeded() &&
851 !job.config_params.ready_task.is_null()) {
852 // If this was a configuration job with a ready task, invoke it now that
853 // we finished successfully.
854 AutoReset<bool> protector(&no_scheduling_allowed_, true); 829 AutoReset<bool> protector(&no_scheduling_allowed_, true);
855 job.config_params.ready_task.Run(); 830 succeeded = job->Finish(exited_prematurely);
856 } 831 }
857 832
858 SDVLOG(2) << "Updating the next polling time after SyncMain"; 833 SDVLOG(2) << "Updating the next polling time after SyncMain";
859 ScheduleNextSync(job); 834 ScheduleNextSync(job.Pass(), succeeded);
835 return succeeded;
860 } 836 }
861 837
862 void SyncSchedulerImpl::ScheduleNextSync(const SyncSessionJob& old_job) { 838 void SyncSchedulerImpl::ScheduleNextSync(
839 scoped_ptr<SyncSessionJob> finished_job, bool succeeded) {
863 DCHECK_EQ(MessageLoop::current(), sync_loop_); 840 DCHECK_EQ(MessageLoop::current(), sync_loop_);
864 DCHECK(!old_job.session->HasMoreToSync()); 841 DCHECK(!finished_job->session()->HasMoreToSync());
865 842
866 AdjustPolling(&old_job); 843 AdjustPolling(finished_job.get());
867 844
868 if (old_job.session->Succeeded()) { 845 if (succeeded) {
869 // Only reset backoff if we actually reached the server. 846 // Only reset backoff if we actually reached the server.
870 if (old_job.session->SuccessfullyReachedServer()) 847 // It's possible that we reached the server on one attempt, then had an
848 // error on the next (or didn't perform some of the server-communicating
849 // commands). We want to verify that, for all commands attempted, we
850 // successfully spoke with the server. Therefore, we verify no errors
851 // and at least one SYNCER_OK.
852 if (finished_job->session()->DidReachServer())
871 wait_interval_.reset(); 853 wait_interval_.reset();
872 SDVLOG(2) << "Job succeeded so not scheduling more jobs"; 854 SDVLOG(2) << "Job succeeded so not scheduling more jobs";
873 return; 855 return;
874 } 856 }
875 857
876 if (old_job.purpose == SyncSessionJob::POLL) { 858 if (IsSyncingCurrentlySilenced()) {
859 SDVLOG(2) << "We are currently throttled; scheduling Unthrottle.";
860 // If we're here, it's because |job| was silenced until a server specified
861 // time. (Note, it had to be |job|, because DecideOnJob would not permit
862 // any job through while in WaitInterval::THROTTLED).
863 scoped_ptr<SyncSessionJob> clone = finished_job->Clone();
864 if (clone->purpose() == SyncSessionJob::NUDGE)
865 pending_nudge_ = clone.get();
866 else if (clone->purpose() == SyncSessionJob::CONFIGURATION)
867 wait_interval_->pending_configure_job = clone.get();
868 else
869 clone.reset(); // Unthrottling is enough, no need to force a canary.
870
871 RestartWaiting(clone.Pass());
872 return;
873 }
874
875 if (finished_job->purpose() == SyncSessionJob::POLL) {
877 return; // We don't retry POLL jobs. 876 return; // We don't retry POLL jobs.
878 } 877 }
879 878
880 // TODO(rlarocque): There's no reason why we should blindly backoff and retry 879 // TODO(rlarocque): There's no reason why we should blindly backoff and retry
881 // if we don't succeed. Some types of errors are not likely to disappear on 880 // if we don't succeed. Some types of errors are not likely to disappear on
882 // their own. With the return values now available in the old_job.session, we 881 // their own. With the return values now available in the old_job.session,
883 // should be able to detect such errors and only retry when we detect 882 // we should be able to detect such errors and only retry when we detect
884 // transient errors. 883 // transient errors.
885 884
886 if (IsBackingOff() && wait_interval_->timer.IsRunning() && 885 if (IsBackingOff() && wait_interval_->timer.IsRunning() &&
887 mode_ == NORMAL_MODE) { 886 mode_ == NORMAL_MODE) {
888 // When in normal mode, we allow up to one nudge per backoff interval. It 887 // When in normal mode, we allow up to one nudge per backoff interval. It
889 // appears that this was our nudge for this interval, and it failed. 888 // appears that this was our nudge for this interval, and it failed.
890 // 889 //
891 // Note: This does not prevent us from running canary jobs. For example, an 890 // Note: This does not prevent us from running canary jobs. For example,
892 // IP address change might still result in another nudge being executed 891 // an IP address change might still result in another nudge being executed
893 // during this backoff interval. 892 // during this backoff interval.
894 SDVLOG(2) << "A nudge during backoff failed"; 893 SDVLOG(2) << "A nudge during backoff failed, creating new pending nudge.";
895 894 DCHECK_EQ(SyncSessionJob::NUDGE, finished_job->purpose());
896 DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose);
897 DCHECK(!wait_interval_->had_nudge); 895 DCHECK(!wait_interval_->had_nudge);
898 896
899 wait_interval_->had_nudge = true; 897 wait_interval_->had_nudge = true;
900 InitOrCoalescePendingJob(old_job); 898 DCHECK(!pending_nudge_);
901 RestartWaiting(); 899
900 scoped_ptr<SyncSessionJob> new_job = finished_job->Clone();
901 pending_nudge_ = new_job.get();
902 RestartWaiting(new_job.Pass());
902 } else { 903 } else {
903 // Either this is the first failure or a consecutive failure after our 904 // Either this is the first failure or a consecutive failure after our
904 // backoff timer expired. We handle it the same way in either case. 905 // backoff timer expired. We handle it the same way in either case.
905 SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed"; 906 SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed";
906 HandleContinuationError(old_job); 907 HandleContinuationError(finished_job.get());
907 } 908 }
908 } 909 }
909 910
910 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { 911 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) {
911 DCHECK_EQ(MessageLoop::current(), sync_loop_); 912 DCHECK_EQ(MessageLoop::current(), sync_loop_);
912 913
913 TimeDelta poll = (!session_context_->notifications_enabled()) ? 914 TimeDelta poll = (!session_context_->notifications_enabled()) ?
914 syncer_short_poll_interval_seconds_ : 915 syncer_short_poll_interval_seconds_ :
915 syncer_long_poll_interval_seconds_; 916 syncer_long_poll_interval_seconds_;
916 bool rate_changed = !poll_timer_.IsRunning() || 917 bool rate_changed = !poll_timer_.IsRunning() ||
917 poll != poll_timer_.GetCurrentDelay(); 918 poll != poll_timer_.GetCurrentDelay();
918 919
919 if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed) 920 if (old_job && old_job->purpose() != SyncSessionJob::POLL && !rate_changed)
920 poll_timer_.Reset(); 921 poll_timer_.Reset();
921 922
922 if (!rate_changed) 923 if (!rate_changed)
923 return; 924 return;
924 925
925 // Adjust poll rate. 926 // Adjust poll rate.
926 poll_timer_.Stop(); 927 poll_timer_.Stop();
927 poll_timer_.Start(FROM_HERE, poll, this, 928 poll_timer_.Start(FROM_HERE, poll, this,
928 &SyncSchedulerImpl::PollTimerCallback); 929 &SyncSchedulerImpl::PollTimerCallback);
929 } 930 }
930 931
931 void SyncSchedulerImpl::RestartWaiting() { 932 void SyncSchedulerImpl::RestartWaiting(scoped_ptr<SyncSessionJob> job) {
932 CHECK(wait_interval_.get()); 933 CHECK(wait_interval_.get());
933 wait_interval_->timer.Stop(); 934 wait_interval_->timer.Stop();
934 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, 935 DCHECK(wait_interval_->length >= TimeDelta::FromSeconds(0));
935 this, &SyncSchedulerImpl::DoCanaryJob); 936 if (wait_interval_->mode == WaitInterval::THROTTLED) {
937 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length,
938 base::Bind(&SyncSchedulerImpl::Unthrottle,
939 weak_ptr_factory_.GetWeakPtr(),
940 base::Passed(&job)));
941 } else {
942 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length,
943 base::Bind(&SyncSchedulerImpl::DoCanaryJob,
944 weak_ptr_factory_.GetWeakPtr(),
945 base::Passed(&job)));
946 }
936 } 947 }
937 948
938 void SyncSchedulerImpl::HandleContinuationError( 949 void SyncSchedulerImpl::HandleContinuationError(
939 const SyncSessionJob& old_job) { 950 const SyncSessionJob* old_job) {
940 DCHECK_EQ(MessageLoop::current(), sync_loop_); 951 DCHECK_EQ(MessageLoop::current(), sync_loop_);
941 if (DCHECK_IS_ON()) { 952 if (DCHECK_IS_ON()) {
942 if (IsBackingOff()) { 953 if (IsBackingOff()) {
943 DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job); 954 DCHECK(wait_interval_->timer.IsRunning() || old_job->is_canary());
944 } 955 }
945 } 956 }
946 957
947 TimeDelta length = delay_provider_->GetDelay( 958 TimeDelta length = delay_provider_->GetDelay(
948 IsBackingOff() ? wait_interval_->length : 959 IsBackingOff() ? wait_interval_->length :
949 delay_provider_->GetInitialDelay( 960 delay_provider_->GetInitialDelay(
950 old_job.session->status_controller().model_neutral_state())); 961 old_job->session()->status_controller().model_neutral_state()));
951 962
952 SDVLOG(2) << "In handle continuation error with " 963 SDVLOG(2) << "In handle continuation error with "
953 << SyncSessionJob::GetPurposeString(old_job.purpose) 964 << SyncSessionJob::GetPurposeString(old_job->purpose())
954 << " job. The time delta(ms) is " 965 << " job. The time delta(ms) is "
955 << length.InMilliseconds(); 966 << length.InMilliseconds();
956 967
957 // This will reset the had_nudge variable as well. 968 // This will reset the had_nudge variable as well.
958 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, 969 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF,
959 length)); 970 length));
960 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { 971 scoped_ptr<SyncSessionJob> new_job(old_job->CloneFromLocation(FROM_HERE));
972 new_job->set_scheduled_start(TimeTicks::Now() + length);
973 if (old_job->purpose() == SyncSessionJob::CONFIGURATION) {
961 SDVLOG(2) << "Configuration did not succeed, scheduling retry."; 974 SDVLOG(2) << "Configuration did not succeed, scheduling retry.";
962 // Config params should always get set. 975 // Config params should always get set.
963 DCHECK(!old_job.config_params.ready_task.is_null()); 976 DCHECK(!old_job->config_params().ready_task.is_null());
964 SyncSession* old = old_job.session.get(); 977 wait_interval_->pending_configure_job = new_job.get();
965 SyncSession* s(new SyncSession(session_context_, this,
966 old->source(), old->routing_info(), old->workers()));
967 SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length,
968 make_linked_ptr(s), false, old_job.config_params,
969 FROM_HERE);
970 wait_interval_->pending_configure_job.reset(new SyncSessionJob(job));
971 } else { 978 } else {
972 // We are not in configuration mode. So wait_interval's pending job 979 // We are not in configuration mode. So wait_interval's pending job
973 // should be null. 980 // should be null.
974 DCHECK(wait_interval_->pending_configure_job.get() == NULL); 981 DCHECK(wait_interval_->pending_configure_job == NULL);
982 DCHECK(!pending_nudge_);
983 pending_nudge_ = new_job.get();
984 }
975 985
976 // TODO(lipalani) - handle clear user data. 986 RestartWaiting(new_job.Pass());
977 InitOrCoalescePendingJob(old_job);
978 }
979 RestartWaiting();
980 } 987 }
981 988
982 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { 989 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) {
983 syncer_->RequestEarlyExit(); // Safe to call from any thread. 990 syncer_->RequestEarlyExit(); // Safe to call from any thread.
984 DCHECK(weak_handle_this_.IsInitialized()); 991 DCHECK(weak_handle_this_.IsInitialized());
985 SDVLOG(3) << "Posting StopImpl"; 992 SDVLOG(3) << "Posting StopImpl";
986 weak_handle_this_.Call(FROM_HERE, 993 weak_handle_this_.Call(FROM_HERE,
987 &SyncSchedulerImpl::StopImpl, 994 &SyncSchedulerImpl::StopImpl,
988 callback); 995 callback);
989 } 996 }
990 997
991 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { 998 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) {
992 DCHECK_EQ(MessageLoop::current(), sync_loop_); 999 DCHECK_EQ(MessageLoop::current(), sync_loop_);
993 SDVLOG(2) << "StopImpl called"; 1000 SDVLOG(2) << "StopImpl called";
994 1001
995 // Kill any in-flight method calls. 1002 // Kill any in-flight method calls.
996 weak_ptr_factory_.InvalidateWeakPtrs(); 1003 weak_ptr_factory_.InvalidateWeakPtrs();
997 wait_interval_.reset(); 1004 wait_interval_.reset();
998 poll_timer_.Stop(); 1005 poll_timer_.Stop();
999 if (started_) { 1006 if (started_) {
1000 started_ = false; 1007 started_ = false;
1001 } 1008 }
1002 if (!callback.is_null()) 1009 if (!callback.is_null())
1003 callback.Run(); 1010 callback.Run();
1004 } 1011 }
1005 1012
1006 void SyncSchedulerImpl::DoCanaryJob() { 1013 void SyncSchedulerImpl::DoCanaryJob(scoped_ptr<SyncSessionJob> to_be_canary) {
1007 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1014 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1008 SDVLOG(2) << "Do canary job"; 1015 SDVLOG(2) << "Do canary job";
1009 DoPendingJobIfPossible(true); 1016
1017 // Only set canary privileges here, when we are about to run the job. This
1018 // avoids confusion in managing canary bits during scheduling, when you
1019 // consider that mode switches (e.g., to config) can "pre-empt" a NUDGE that
1020 // was scheduled as canary, and send it to an "unscheduled" state.
1021 to_be_canary->GrantCanaryPrivilege();
1022
1023 if (to_be_canary->purpose() == SyncSessionJob::NUDGE) {
1024 // TODO(tim): We should be able to remove this...
1025 scoped_ptr<SyncSession> temp = CreateSyncSession(
1026 to_be_canary->session()->source()).Pass();
1027 // The routing info might have been changed since we cached the
1028 // pending nudge. Update it by coalescing to the latest.
1029 to_be_canary->mutable_session()->Coalesce(*(temp));
1030 }
1031 DoSyncSessionJob(to_be_canary.Pass());
1010 } 1032 }
1011 1033
1012 void SyncSchedulerImpl::DoPendingJobIfPossible(bool is_canary_job) { 1034 scoped_ptr<SyncSessionJob> SyncSchedulerImpl::TakePendingJobForCurrentMode() {
1013 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1035 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1014 SyncSessionJob* job_to_execute = NULL; 1036 // If we find a scheduled pending_ job, abandon the old one and return a
1037 // a clone. If unscheduled, just hand over ownership.
1038 scoped_ptr<SyncSessionJob> candidate;
1015 if (mode_ == CONFIGURATION_MODE && wait_interval_.get() 1039 if (mode_ == CONFIGURATION_MODE && wait_interval_.get()
1016 && wait_interval_->pending_configure_job.get()) { 1040 && wait_interval_->pending_configure_job) {
1017 SDVLOG(2) << "Found pending configure job"; 1041 SDVLOG(2) << "Found pending configure job";
1018 job_to_execute = wait_interval_->pending_configure_job.get(); 1042 candidate =
1019 } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) { 1043 wait_interval_->pending_configure_job->CloneAndAbandon().Pass();
1044 wait_interval_->pending_configure_job = candidate.get();
1045 } else if (mode_ == NORMAL_MODE && pending_nudge_) {
1020 SDVLOG(2) << "Found pending nudge job"; 1046 SDVLOG(2) << "Found pending nudge job";
1021 1047 candidate = pending_nudge_->CloneAndAbandon();
1022 scoped_ptr<SyncSession> session(CreateSyncSession( 1048 pending_nudge_ = candidate.get();
1023 pending_nudge_->session->source())); 1049 unscheduled_nudge_storage_.reset();
1024
1025 // Also the routing info might have been changed since we cached the
1026 // pending nudge. Update it by coalescing to the latest.
1027 pending_nudge_->session->Coalesce(*(session.get()));
1028 // The pending nudge would be cleared in the DoSyncSessionJob function.
1029 job_to_execute = pending_nudge_.get();
1030 } 1050 }
1031 1051 return candidate.Pass();
1032 if (job_to_execute != NULL) {
1033 SDVLOG(2) << "Executing pending job";
1034 SyncSessionJob copy = *job_to_execute;
1035 copy.is_canary_job = is_canary_job;
1036 DoSyncSessionJob(copy);
1037 }
1038 } 1052 }
1039 1053
1040 SyncSession* SyncSchedulerImpl::CreateSyncSession( 1054 scoped_ptr<SyncSession> SyncSchedulerImpl::CreateSyncSession(
1041 const SyncSourceInfo& source) { 1055 const SyncSourceInfo& source) {
1042 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1056 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1043 DVLOG(2) << "Creating sync session with routes " 1057 DVLOG(2) << "Creating sync session with routes "
1044 << ModelSafeRoutingInfoToString(session_context_->routing_info()); 1058 << ModelSafeRoutingInfoToString(session_context_->routing_info());
1045 1059
1046 SyncSourceInfo info(source); 1060 SyncSourceInfo info(source);
1047 SyncSession* session(new SyncSession(session_context_, this, info, 1061 return scoped_ptr<SyncSession>(new SyncSession(session_context_, this, info,
1048 session_context_->routing_info(), session_context_->workers())); 1062 session_context_->routing_info(), session_context_->workers()));
1049
1050 return session;
1051 } 1063 }
1052 1064
1053 void SyncSchedulerImpl::PollTimerCallback() { 1065 void SyncSchedulerImpl::PollTimerCallback() {
1054 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1066 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1055 ModelSafeRoutingInfo r; 1067 ModelSafeRoutingInfo r;
1056 ModelTypeInvalidationMap invalidation_map = 1068 ModelTypeInvalidationMap invalidation_map =
1057 ModelSafeRoutingInfoToInvalidationMap(r, std::string()); 1069 ModelSafeRoutingInfoToInvalidationMap(r, std::string());
1058 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map); 1070 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, invalidation_map);
1059 SyncSession* s = CreateSyncSession(info); 1071 scoped_ptr<SyncSession> s(CreateSyncSession(info));
1060 1072 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL,
1061 SyncSessionJob job(SyncSessionJob::POLL, TimeTicks::Now(), 1073 TimeTicks::Now(),
1062 make_linked_ptr(s), 1074 s.Pass(),
1063 false, 1075 ConfigurationParams(),
1064 ConfigurationParams(), 1076 FROM_HERE));
1065 FROM_HERE); 1077 ScheduleSyncSessionJob(job.Pass());
1066
1067 ScheduleSyncSessionJob(job);
1068 } 1078 }
1069 1079
1070 void SyncSchedulerImpl::Unthrottle() { 1080 void SyncSchedulerImpl::Unthrottle(scoped_ptr<SyncSessionJob> to_be_canary) {
1071 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1081 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1072 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); 1082 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
1073 SDVLOG(2) << "Unthrottled."; 1083 SDVLOG(2) << "Unthrottled " << (to_be_canary.get() ? "with " : "without ")
1074 DoCanaryJob(); 1084 << "canary.";
1085 if (to_be_canary.get())
1086 DoCanaryJob(to_be_canary.Pass());
1087
1088 // TODO(tim): The way DecideOnJob works today, canary privileges aren't
1089 // enough to bypass a THROTTLED wait interval, which would suggest we need
1090 // to reset before DoCanaryJob (though trusting canary in DecideOnJob is
1091 // probably the "right" thing to do). Bug 154216.
1075 wait_interval_.reset(); 1092 wait_interval_.reset();
1076 } 1093 }
1077 1094
1078 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) { 1095 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) {
1079 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1096 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1080 session_context_->NotifyListeners(SyncEngineEvent(cause)); 1097 session_context_->NotifyListeners(SyncEngineEvent(cause));
1081 } 1098 }
1082 1099
1083 bool SyncSchedulerImpl::IsBackingOff() const { 1100 bool SyncSchedulerImpl::IsBackingOff() const {
1084 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1101 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1085 return wait_interval_.get() && wait_interval_->mode == 1102 return wait_interval_.get() && wait_interval_->mode ==
1086 WaitInterval::EXPONENTIAL_BACKOFF; 1103 WaitInterval::EXPONENTIAL_BACKOFF;
1087 } 1104 }
1088 1105
1089 void SyncSchedulerImpl::OnSilencedUntil( 1106 void SyncSchedulerImpl::OnSilencedUntil(
1090 const base::TimeTicks& silenced_until) { 1107 const base::TimeTicks& silenced_until) {
1091 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1108 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1092 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, 1109 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED,
1093 silenced_until - TimeTicks::Now())); 1110 silenced_until - TimeTicks::Now()));
1094 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, this,
1095 &SyncSchedulerImpl::Unthrottle);
1096 } 1111 }
1097 1112
1098 bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() { 1113 bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() {
1099 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1114 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1100 return wait_interval_.get() && wait_interval_->mode == 1115 return wait_interval_.get() && wait_interval_->mode ==
1101 WaitInterval::THROTTLED; 1116 WaitInterval::THROTTLED;
1102 } 1117 }
1103 1118
1104 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( 1119 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate(
1105 const base::TimeDelta& new_interval) { 1120 const base::TimeDelta& new_interval) {
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
1159 1174
1160 #undef SDVLOG_LOC 1175 #undef SDVLOG_LOC
1161 1176
1162 #undef SDVLOG 1177 #undef SDVLOG
1163 1178
1164 #undef SLOG 1179 #undef SLOG
1165 1180
1166 #undef ENUM_CASE 1181 #undef ENUM_CASE
1167 1182
1168 } // namespace syncer 1183 } // namespace syncer
OLDNEW

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld 1280:2d3e6564b7b6