Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(98)

Side by Side Diff: sync/engine/sync_scheduler_impl.cc

Issue 10917234: sync: make scheduling logic and job ownership more obvious. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: test + comment + rebase Created 8 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "sync/engine/sync_scheduler_impl.h" 5 #include "sync/engine/sync_scheduler_impl.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 #include <cstring> 8 #include <cstring>
9 9
10 #include "base/auto_reset.h" 10 #include "base/auto_reset.h"
11 #include "base/bind.h" 11 #include "base/bind.h"
12 #include "base/bind_helpers.h"
12 #include "base/compiler_specific.h" 13 #include "base/compiler_specific.h"
13 #include "base/location.h" 14 #include "base/location.h"
14 #include "base/logging.h" 15 #include "base/logging.h"
15 #include "base/message_loop.h" 16 #include "base/message_loop.h"
16 #include "sync/engine/backoff_delay_provider.h" 17 #include "sync/engine/backoff_delay_provider.h"
17 #include "sync/engine/syncer.h" 18 #include "sync/engine/syncer.h"
18 #include "sync/engine/throttled_data_type_tracker.h" 19 #include "sync/engine/throttled_data_type_tracker.h"
19 #include "sync/protocol/proto_enum_conversions.h" 20 #include "sync/protocol/proto_enum_conversions.h"
20 #include "sync/protocol/sync.pb.h" 21 #include "sync/protocol/sync.pb.h"
21 #include "sync/util/data_type_histogram.h" 22 #include "sync/util/data_type_histogram.h"
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
76 : source(source), 77 : source(source),
77 types_to_download(types_to_download), 78 types_to_download(types_to_download),
78 routing_info(routing_info), 79 routing_info(routing_info),
79 ready_task(ready_task) { 80 ready_task(ready_task) {
80 DCHECK(!ready_task.is_null()); 81 DCHECK(!ready_task.is_null());
81 } 82 }
82 ConfigurationParams::~ConfigurationParams() {} 83 ConfigurationParams::~ConfigurationParams() {}
83 84
84 SyncSchedulerImpl::WaitInterval::WaitInterval() 85 SyncSchedulerImpl::WaitInterval::WaitInterval()
85 : mode(UNKNOWN), 86 : mode(UNKNOWN),
86 had_nudge(false) { 87 had_nudge(false),
88 pending_configure_job(NULL) {
akalin 2012/09/25 22:37:24 three different styles for empty function bodies h
tim (not reviewing) 2012/10/08 00:20:03 Done.
87 } 89 }
88 90
91 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
92 : mode(mode), had_nudge(false), length(length),
93 pending_configure_job(NULL) { }
94
89 SyncSchedulerImpl::WaitInterval::~WaitInterval() {} 95 SyncSchedulerImpl::WaitInterval::~WaitInterval() {}
90 96
91 #define ENUM_CASE(x) case x: return #x; break; 97 #define ENUM_CASE(x) case x: return #x; break;
92 98
93 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) { 99 const char* SyncSchedulerImpl::WaitInterval::GetModeString(Mode mode) {
94 switch (mode) { 100 switch (mode) {
95 ENUM_CASE(UNKNOWN); 101 ENUM_CASE(UNKNOWN);
96 ENUM_CASE(EXPONENTIAL_BACKOFF); 102 ENUM_CASE(EXPONENTIAL_BACKOFF);
97 ENUM_CASE(THROTTLED); 103 ENUM_CASE(THROTTLED);
98 } 104 }
99 NOTREACHED(); 105 NOTREACHED();
100 return ""; 106 return "";
101 } 107 }
102 108
103 SyncSchedulerImpl::SyncSessionJob::SyncSessionJob()
104 : purpose(UNKNOWN),
105 is_canary_job(false) {
106 }
107
108 SyncSchedulerImpl::SyncSessionJob::~SyncSessionJob() {}
109
110 SyncSchedulerImpl::SyncSessionJob::SyncSessionJob(SyncSessionJobPurpose purpose,
111 base::TimeTicks start,
112 linked_ptr<sessions::SyncSession> session,
113 bool is_canary_job,
114 const ConfigurationParams& config_params,
115 const tracked_objects::Location& from_here)
116 : purpose(purpose),
117 scheduled_start(start),
118 session(session),
119 is_canary_job(is_canary_job),
120 config_params(config_params),
121 from_here(from_here) {
122 }
123
124 const char* SyncSchedulerImpl::SyncSessionJob::GetPurposeString(
125 SyncSchedulerImpl::SyncSessionJob::SyncSessionJobPurpose purpose) {
126 switch (purpose) {
127 ENUM_CASE(UNKNOWN);
128 ENUM_CASE(POLL);
129 ENUM_CASE(NUDGE);
130 ENUM_CASE(CONFIGURATION);
131 }
132 NOTREACHED();
133 return "";
134 }
135
136 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( 109 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
137 NudgeSource source) { 110 NudgeSource source) {
138 switch (source) { 111 switch (source) {
139 case NUDGE_SOURCE_NOTIFICATION: 112 case NUDGE_SOURCE_NOTIFICATION:
140 return GetUpdatesCallerInfo::NOTIFICATION; 113 return GetUpdatesCallerInfo::NOTIFICATION;
141 case NUDGE_SOURCE_LOCAL: 114 case NUDGE_SOURCE_LOCAL:
142 return GetUpdatesCallerInfo::LOCAL; 115 return GetUpdatesCallerInfo::LOCAL;
143 case NUDGE_SOURCE_CONTINUATION: 116 case NUDGE_SOURCE_CONTINUATION:
144 return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; 117 return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
145 case NUDGE_SOURCE_LOCAL_REFRESH: 118 case NUDGE_SOURCE_LOCAL_REFRESH:
146 return GetUpdatesCallerInfo::DATATYPE_REFRESH; 119 return GetUpdatesCallerInfo::DATATYPE_REFRESH;
147 case NUDGE_SOURCE_UNKNOWN: 120 case NUDGE_SOURCE_UNKNOWN:
148 return GetUpdatesCallerInfo::UNKNOWN; 121 return GetUpdatesCallerInfo::UNKNOWN;
149 default: 122 default:
150 NOTREACHED(); 123 NOTREACHED();
151 return GetUpdatesCallerInfo::UNKNOWN; 124 return GetUpdatesCallerInfo::UNKNOWN;
152 } 125 }
153 } 126 }
154 127
155 SyncSchedulerImpl::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
156 : mode(mode), had_nudge(false), length(length) { }
157
158 // Helper macros to log with the syncer thread name; useful when there 128 // Helper macros to log with the syncer thread name; useful when there
159 // are multiple syncer threads involved. 129 // are multiple syncer threads involved.
160 130
161 #define SLOG(severity) LOG(severity) << name_ << ": " 131 #define SLOG(severity) LOG(severity) << name_ << ": "
162 132
163 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": " 133 #define SDVLOG(verbose_level) DVLOG(verbose_level) << name_ << ": "
164 134
165 #define SDVLOG_LOC(from_here, verbose_level) \ 135 #define SDVLOG_LOC(from_here, verbose_level) \
166 DVLOG_LOC(from_here, verbose_level) << name_ << ": " 136 DVLOG_LOC(from_here, verbose_level) << name_ << ": "
167 137
(...skipping 30 matching lines...) Expand all
198 syncer_short_poll_interval_seconds_( 168 syncer_short_poll_interval_seconds_(
199 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), 169 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
200 syncer_long_poll_interval_seconds_( 170 syncer_long_poll_interval_seconds_(
201 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), 171 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
202 sessions_commit_delay_( 172 sessions_commit_delay_(
203 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)), 173 TimeDelta::FromSeconds(kDefaultSessionsCommitDelaySeconds)),
204 mode_(NORMAL_MODE), 174 mode_(NORMAL_MODE),
205 // Start with assuming everything is fine with the connection. 175 // Start with assuming everything is fine with the connection.
206 // At the end of the sync cycle we would have the correct status. 176 // At the end of the sync cycle we would have the correct status.
207 connection_code_(HttpResponse::SERVER_CONNECTION_OK), 177 connection_code_(HttpResponse::SERVER_CONNECTION_OK),
178 pending_nudge_(NULL),
208 delay_provider_(delay_provider), 179 delay_provider_(delay_provider),
209 syncer_(syncer), 180 syncer_(syncer),
210 session_context_(context), 181 session_context_(context),
211 no_scheduling_allowed_(false) { 182 no_scheduling_allowed_(false) {
212 DCHECK(sync_loop_); 183 DCHECK(sync_loop_);
213 } 184 }
214 185
215 SyncSchedulerImpl::~SyncSchedulerImpl() { 186 SyncSchedulerImpl::~SyncSchedulerImpl() {
216 DCHECK_EQ(MessageLoop::current(), sync_loop_); 187 DCHECK_EQ(MessageLoop::current(), sync_loop_);
217 StopImpl(base::Closure()); 188 StopImpl(base::Closure());
(...skipping 16 matching lines...) Expand all
234 void SyncSchedulerImpl::OnConnectionStatusChange() { 205 void SyncSchedulerImpl::OnConnectionStatusChange() {
235 if (HttpResponse::CONNECTION_UNAVAILABLE == connection_code_) { 206 if (HttpResponse::CONNECTION_UNAVAILABLE == connection_code_) {
236 // Optimistically assume that the connection is fixed and try 207 // Optimistically assume that the connection is fixed and try
237 // connecting. 208 // connecting.
238 OnServerConnectionErrorFixed(); 209 OnServerConnectionErrorFixed();
239 } 210 }
240 } 211 }
241 212
242 void SyncSchedulerImpl::OnServerConnectionErrorFixed() { 213 void SyncSchedulerImpl::OnServerConnectionErrorFixed() {
243 connection_code_ = HttpResponse::SERVER_CONNECTION_OK; 214 connection_code_ = HttpResponse::SERVER_CONNECTION_OK;
215 // There could be a pending nudge or configuration job in several cases:
216 //
217 // 1. We're in exponential backoff.
218 // 2. We're silenced / throttled.
219 // 3. A nudge was saved previously due to not having a valid auth token.
220 // 4. A nudge was scheduled + saved while in configuration mode.
221 //
222 // In all cases except (2), we want to retry contacting the server. We
223 // call DoCanaryJob to achieve this, and note that nothing -- not even a
224 // canary job -- can bypass a THROTTLED WaitInterval. The only thing that
225 // has the authority to do that is the Unthrottle timer.
226 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode());
227 if (!pending.get())
228 return;
229
244 PostTask(FROM_HERE, "DoCanaryJob", 230 PostTask(FROM_HERE, "DoCanaryJob",
245 base::Bind(&SyncSchedulerImpl::DoCanaryJob, 231 base::Bind(&SyncSchedulerImpl::DoCanaryJob,
246 weak_ptr_factory_.GetWeakPtr())); 232 weak_ptr_factory_.GetWeakPtr(),
247 233 base::Passed(&pending)));
248 } 234 }
249 235
250 void SyncSchedulerImpl::UpdateServerConnectionManagerStatus( 236 void SyncSchedulerImpl::UpdateServerConnectionManagerStatus(
251 HttpResponse::ServerConnectionCode code) { 237 HttpResponse::ServerConnectionCode code) {
252 DCHECK_EQ(MessageLoop::current(), sync_loop_); 238 DCHECK_EQ(MessageLoop::current(), sync_loop_);
253 SDVLOG(2) << "New server connection code: " 239 SDVLOG(2) << "New server connection code: "
254 << HttpResponse::GetServerConnectionCodeString(code); 240 << HttpResponse::GetServerConnectionCodeString(code);
255 241
256 connection_code_ = code; 242 connection_code_ = code;
257 } 243 }
(...skipping 15 matching lines...) Expand all
273 Mode old_mode = mode_; 259 Mode old_mode = mode_;
274 mode_ = mode; 260 mode_ = mode;
275 AdjustPolling(NULL); // Will kick start poll timer if needed. 261 AdjustPolling(NULL); // Will kick start poll timer if needed.
276 262
277 if (old_mode != mode_) { 263 if (old_mode != mode_) {
278 // We just changed our mode. See if there are any pending jobs that we could 264 // We just changed our mode. See if there are any pending jobs that we could
279 // execute in the new mode. 265 // execute in the new mode.
280 if (mode_ == NORMAL_MODE) { 266 if (mode_ == NORMAL_MODE) {
281 // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job 267 // It is illegal to switch to NORMAL_MODE if a previous CONFIGURATION job
282 // has not yet completed. 268 // has not yet completed.
283 DCHECK(!wait_interval_.get() || 269 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job);
284 !wait_interval_->pending_configure_job.get());
285 } 270 }
286 271
287 DoPendingJobIfPossible(false); 272 scoped_ptr<SyncSessionJob> pending(TakePendingJobForCurrentMode());
273 if (pending.get()) {
274 // TODO(tim): We should be able to remove this...
275 scoped_ptr<SyncSession> session(CreateSyncSession(
276 pending->session()->source()));
277 // Also the routing info might have been changed since we cached the
278 // pending nudge. Update it by coalescing to the latest.
279 pending->mutable_session()->Coalesce(*(session));
280 SDVLOG(2) << "Executing pending job. Good luck!";
281 DoSyncSessionJob(pending.Pass());
282 }
288 } 283 }
289 } 284 }
290 285
291 void SyncSchedulerImpl::SendInitialSnapshot() { 286 void SyncSchedulerImpl::SendInitialSnapshot() {
292 DCHECK_EQ(MessageLoop::current(), sync_loop_); 287 DCHECK_EQ(MessageLoop::current(), sync_loop_);
293 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this, 288 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_, this,
294 SyncSourceInfo(), ModelSafeRoutingInfo(), 289 SyncSourceInfo(), ModelSafeRoutingInfo(),
295 std::vector<ModelSafeWorker*>())); 290 std::vector<ModelSafeWorker*>()));
296 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); 291 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED);
297 event.snapshot = dummy->TakeSnapshot(); 292 event.snapshot = dummy->TakeSnapshot();
(...skipping 23 matching lines...) Expand all
321 bool SyncSchedulerImpl::ScheduleConfiguration( 316 bool SyncSchedulerImpl::ScheduleConfiguration(
322 const ConfigurationParams& params) { 317 const ConfigurationParams& params) {
323 DCHECK_EQ(MessageLoop::current(), sync_loop_); 318 DCHECK_EQ(MessageLoop::current(), sync_loop_);
324 DCHECK(IsConfigRelatedUpdateSourceValue(params.source)); 319 DCHECK(IsConfigRelatedUpdateSourceValue(params.source));
325 DCHECK_EQ(CONFIGURATION_MODE, mode_); 320 DCHECK_EQ(CONFIGURATION_MODE, mode_);
326 DCHECK(!params.ready_task.is_null()); 321 DCHECK(!params.ready_task.is_null());
327 SDVLOG(2) << "Reconfiguring syncer."; 322 SDVLOG(2) << "Reconfiguring syncer.";
328 323
329 // Only one configuration is allowed at a time. Verify we're not waiting 324 // Only one configuration is allowed at a time. Verify we're not waiting
330 // for a pending configure job. 325 // for a pending configure job.
331 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job.get()); 326 DCHECK(!wait_interval_.get() || !wait_interval_->pending_configure_job);
332 327
333 ModelSafeRoutingInfo restricted_routes; 328 ModelSafeRoutingInfo restricted_routes;
334 BuildModelSafeParams(params.types_to_download, 329 BuildModelSafeParams(params.types_to_download,
335 params.routing_info, 330 params.routing_info,
336 &restricted_routes); 331 &restricted_routes);
337 session_context_->set_routing_info(params.routing_info); 332 session_context_->set_routing_info(params.routing_info);
338 333
339 // Only reconfigure if we have types to download. 334 // Only reconfigure if we have types to download.
340 if (!params.types_to_download.Empty()) { 335 if (!params.types_to_download.Empty()) {
341 DCHECK(!restricted_routes.empty()); 336 DCHECK(!restricted_routes.empty());
342 linked_ptr<SyncSession> session(new SyncSession( 337 scoped_ptr<SyncSession> session(new SyncSession(
343 session_context_, 338 session_context_,
344 this, 339 this,
345 SyncSourceInfo(params.source, 340 SyncSourceInfo(params.source,
346 ModelSafeRoutingInfoToStateMap( 341 ModelSafeRoutingInfoToStateMap(
347 restricted_routes, 342 restricted_routes,
348 std::string())), 343 std::string())),
349 restricted_routes, 344 restricted_routes,
350 session_context_->workers())); 345 session_context_->workers()));
351 SyncSessionJob job(SyncSessionJob::CONFIGURATION, 346 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(
352 TimeTicks::Now(), 347 SyncSessionJob::CONFIGURATION,
353 session, 348 TimeTicks::Now(),
354 false, 349 session.Pass(),
355 params, 350 params,
356 FROM_HERE); 351 FROM_HERE));
357 DoSyncSessionJob(job); 352 bool succeeded = DoSyncSessionJob(job.Pass());
358 353
359 // If we failed, the job would have been saved as the pending configure 354 // If we failed, the job would have been saved as the pending configure
360 // job and a wait interval would have been set. 355 // job and a wait interval would have been set.
361 if (!session->Succeeded()) { 356 if (!succeeded) {
362 DCHECK(wait_interval_.get() && 357 DCHECK(wait_interval_.get() && wait_interval_->pending_configure_job);
363 wait_interval_->pending_configure_job.get());
364 return false; 358 return false;
365 } 359 }
366 } else { 360 } else {
367 SDVLOG(2) << "No change in routing info, calling ready task directly."; 361 SDVLOG(2) << "No change in routing info, calling ready task directly.";
368 params.ready_task.Run(); 362 params.ready_task.Run();
369 } 363 }
370 364
371 return true; 365 return true;
372 } 366 }
373 367
374 SyncSchedulerImpl::JobProcessDecision 368 SyncSchedulerImpl::JobProcessDecision
375 SyncSchedulerImpl::DecideWhileInWaitInterval( 369 SyncSchedulerImpl::DecideWhileInWaitInterval(const SyncSessionJob* job) {
376 const SyncSessionJob& job) {
377 DCHECK_EQ(MessageLoop::current(), sync_loop_); 370 DCHECK_EQ(MessageLoop::current(), sync_loop_);
378 DCHECK(wait_interval_.get()); 371 DCHECK(wait_interval_.get());
379 372
380 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode " 373 SDVLOG(2) << "DecideWhileInWaitInterval with WaitInterval mode "
381 << WaitInterval::GetModeString(wait_interval_->mode) 374 << WaitInterval::GetModeString(wait_interval_->mode)
382 << (wait_interval_->had_nudge ? " (had nudge)" : "") 375 << (wait_interval_->had_nudge ? " (had nudge)" : "")
383 << (job.is_canary_job ? " (canary)" : ""); 376 << (job->is_canary() ? " (canary)" : "");
384 377
385 if (job.purpose == SyncSessionJob::POLL) 378 if (job->purpose() == SyncSessionJob::POLL)
386 return DROP; 379 return DROP;
387 380
388 DCHECK(job.purpose == SyncSessionJob::NUDGE || 381 // If we save a job while in a WaitInterval, there is a well-defined moment
389 job.purpose == SyncSessionJob::CONFIGURATION); 382 // in time in the future when it makes sense for that SAVE-worthy job to try
383 // running again -- the end of the WaitInterval.
384 DCHECK(job->purpose() == SyncSessionJob::NUDGE ||
385 job->purpose() == SyncSessionJob::CONFIGURATION);
386
387 // If throttled, there's a clock ticking to unthrottle. We want to get
388 // on the same train.
390 if (wait_interval_->mode == WaitInterval::THROTTLED) 389 if (wait_interval_->mode == WaitInterval::THROTTLED)
391 return SAVE; 390 return SAVE;
392 391
393 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); 392 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF);
394 if (job.purpose == SyncSessionJob::NUDGE) { 393 if (job->purpose() == SyncSessionJob::NUDGE) {
395 if (mode_ == CONFIGURATION_MODE) 394 if (mode_ == CONFIGURATION_MODE)
396 return SAVE; 395 return SAVE;
397 396
398 // If we already had one nudge then just drop this nudge. We will retry 397 // If we already had one nudge then just drop this nudge. We will retry
399 // later when the timer runs out. 398 // later when the timer runs out.
400 if (!job.is_canary_job) 399 if (!job->is_canary())
401 return wait_interval_->had_nudge ? DROP : CONTINUE; 400 return wait_interval_->had_nudge ? DROP : CONTINUE;
402 else // We are here because timer ran out. So retry. 401 else // We are here because timer ran out. So retry.
403 return CONTINUE; 402 return CONTINUE;
404 } 403 }
405 return job.is_canary_job ? CONTINUE : SAVE; 404 return job->is_canary() ? CONTINUE : SAVE;
406 } 405 }
407 406
408 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob( 407 SyncSchedulerImpl::JobProcessDecision SyncSchedulerImpl::DecideOnJob(
409 const SyncSessionJob& job) { 408 const SyncSessionJob* job) {
410 DCHECK_EQ(MessageLoop::current(), sync_loop_); 409 DCHECK_EQ(MessageLoop::current(), sync_loop_);
411 410
412 // See if our type is throttled. 411 // See if our type is throttled.
413 ModelTypeSet throttled_types = 412 ModelTypeSet throttled_types =
414 session_context_->throttled_data_type_tracker()->GetThrottledTypes(); 413 session_context_->throttled_data_type_tracker()->GetThrottledTypes();
415 if (job.purpose == SyncSessionJob::NUDGE && 414 if (job->purpose() == SyncSessionJob::NUDGE &&
416 job.session->source().updates_source == GetUpdatesCallerInfo::LOCAL) { 415 job->session()->source().updates_source == GetUpdatesCallerInfo::LOCAL) {
417 ModelTypeSet requested_types; 416 ModelTypeSet requested_types;
418 for (ModelTypeStateMap::const_iterator i = 417 for (ModelTypeStateMap::const_iterator i =
419 job.session->source().types.begin(); 418 job->session()->source().types.begin();
420 i != job.session->source().types.end(); 419 i != job->session()->source().types.end();
421 ++i) { 420 ++i) {
422 requested_types.Put(i->first); 421 requested_types.Put(i->first);
423 } 422 }
424 423
424 // If all types are throttled, do not CONTINUE. Today, we don't treat
425 // a per-datatype "unthrottle" event as something that should force a
426 // canary job. For this reason, there's no good time to reschedule this job
427 // to run -- we'll lazily wait for an independent event to trigger a sync.
428 // Note that there may already be such an event if we're in a WaitInterval,
429 // so we can retry it then.
425 if (!requested_types.Empty() && throttled_types.HasAll(requested_types)) 430 if (!requested_types.Empty() && throttled_types.HasAll(requested_types))
rlarocque 2012/09/24 21:15:14 This isn't related to your change, but your commen
tim (not reviewing) 2012/10/08 00:20:03 Can you define 'forget'? The job would have accoun
rlarocque 2012/10/08 19:18:04 When we calculate the items to commit in GetCommit
tim (not reviewing) 2012/10/11 17:35:14 Hm, I see. Good point. I filed crbug.com/155296 t
426 return SAVE; 431 return SAVE;
427 } 432 }
428 433
429 if (wait_interval_.get()) 434 if (wait_interval_.get())
430 return DecideWhileInWaitInterval(job); 435 return DecideWhileInWaitInterval(job);
431 436
432 if (mode_ == CONFIGURATION_MODE) { 437 if (mode_ == CONFIGURATION_MODE) {
433 if (job.purpose == SyncSessionJob::NUDGE) 438 if (job->purpose() == SyncSessionJob::NUDGE)
434 return SAVE; 439 return SAVE; // Running requires a mode switch.
435 else if (job.purpose == SyncSessionJob::CONFIGURATION) 440 else if (job->purpose() == SyncSessionJob::CONFIGURATION)
436 return CONTINUE; 441 return CONTINUE;
437 else 442 else
438 return DROP; 443 return DROP;
439 } 444 }
440 445
441 // We are in normal mode. 446 // We are in normal mode.
442 DCHECK_EQ(mode_, NORMAL_MODE); 447 DCHECK_EQ(mode_, NORMAL_MODE);
443 DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION); 448 DCHECK_NE(job->purpose(), SyncSessionJob::CONFIGURATION);
444 449
445 // Note about some subtle scheduling semantics. 450 // Note about some subtle scheduling semantics.
446 // 451 //
447 // It's possible at this point that |job| is known to be unnecessary, and 452 // It's possible at this point that |job| is known to be unnecessary, and
448 // dropping it would be perfectly safe and correct. Consider 453 // dropping it would be perfectly safe and correct. Consider
449 // 454 //
450 // 1) |job| is a POLL with a |scheduled_start| time that is less than 455 // 1) |job| is a POLL with a |scheduled_start| time that is less than
451 // the time that the last successful all-datatype NUDGE completed. 456 // the time that the last successful all-datatype NUDGE completed.
452 // 457 //
453 // 2) |job| is a NUDGE (for any combination of types) with a 458 // 2) |job| is a NUDGE (for any combination of types) with a
(...skipping 22 matching lines...) Expand all
476 // * It's not strictly "impossible", but it would be reentrant and hence 481 // * It's not strictly "impossible", but it would be reentrant and hence
477 // illegal. e.g. scheduling a job and re-entering the SyncScheduler is NOT a 482 // illegal. e.g. scheduling a job and re-entering the SyncScheduler is NOT a
478 // legal side effect of any of the work being done as part of a sync cycle. 483 // legal side effect of any of the work being done as part of a sync cycle.
479 // See |no_scheduling_allowed_| for details. 484 // See |no_scheduling_allowed_| for details.
480 485
481 // Decision now rests on state of auth tokens. 486 // Decision now rests on state of auth tokens.
482 if (!session_context_->connection_manager()->HasInvalidAuthToken()) 487 if (!session_context_->connection_manager()->HasInvalidAuthToken())
483 return CONTINUE; 488 return CONTINUE;
484 489
485 SDVLOG(2) << "No valid auth token. Using that to decide on job."; 490 SDVLOG(2) << "No valid auth token. Using that to decide on job.";
486 return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP; 491 // Running the job would require updated auth, so we can't honour
492 // job.scheduled_start().
493 return job->purpose() == SyncSessionJob::NUDGE ? SAVE : DROP;
487 } 494 }
488 495
489 void SyncSchedulerImpl::InitOrCoalescePendingJob(const SyncSessionJob& job) { 496 bool SyncSchedulerImpl::ShouldRunJobSaveIfNecessary(SyncSessionJob* job) {
490 DCHECK_EQ(MessageLoop::current(), sync_loop_);
491 DCHECK(job.purpose != SyncSessionJob::CONFIGURATION);
492 if (pending_nudge_.get() == NULL) {
493 SDVLOG(2) << "Creating a pending nudge job";
494 SyncSession* s = job.session.get();
495
496 // Get a fresh session with similar configuration as before (resets
497 // StatusController).
498 scoped_ptr<SyncSession> session(new SyncSession(s->context(),
499 s->delegate(), s->source(), s->routing_info(), s->workers()));
500
501 SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start,
502 make_linked_ptr(session.release()), false,
503 ConfigurationParams(), job.from_here);
504 pending_nudge_.reset(new SyncSessionJob(new_job));
505 return;
506 }
507
508 SDVLOG(2) << "Coalescing a pending nudge";
509 pending_nudge_->session->Coalesce(*(job.session.get()));
510 pending_nudge_->scheduled_start = job.scheduled_start;
511
512 // Unfortunately the nudge location cannot be modified. So it stores the
513 // location of the first caller.
514 }
515
516 bool SyncSchedulerImpl::ShouldRunJob(const SyncSessionJob& job) {
517 DCHECK_EQ(MessageLoop::current(), sync_loop_); 497 DCHECK_EQ(MessageLoop::current(), sync_loop_);
518 DCHECK(started_); 498 DCHECK(started_);
519 499
520 JobProcessDecision decision = DecideOnJob(job); 500 JobProcessDecision decision = DecideOnJob(job);
521 SDVLOG(2) << "Should run " 501 SDVLOG(2) << "Should run "
522 << SyncSessionJob::GetPurposeString(job.purpose) 502 << SyncSessionJob::GetPurposeString(job->purpose())
523 << " job in mode " << GetModeString(mode_) 503 << " job " << job->session()
504 << " in mode " << GetModeString(mode_)
524 << ": " << GetDecisionString(decision); 505 << ": " << GetDecisionString(decision);
525 if (decision != SAVE) 506 if (decision != SAVE)
rlarocque 2012/09/24 21:15:14 These lines have always bugged me. It's not clear
tim (not reviewing) 2012/10/08 00:20:03 Done.
526 return decision == CONTINUE; 507 return decision == CONTINUE;
527 508
528 DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose == 509 const bool is_nudge = job->purpose() == SyncSessionJob::NUDGE;
529 SyncSessionJob::CONFIGURATION); 510 if (is_nudge && pending_nudge_) {
511 SDVLOG(2) << "Coalescing a pending nudge";
512 // TODO(tim): This basically means we never use the more-careful coalescing
rlarocque 2012/09/24 21:15:14 This is a regression, right? How/Why did this hap
tim (not reviewing) 2012/10/08 00:20:03 It is, but I suspect it has been this way for quit
rlarocque 2012/10/08 19:18:04 Sounds good. I was confused for a bit there; I di
513 // logic in ScheduleNudgeImpl that takes the min of the two nudge start
514 // times, because we're calling this function first. Pull this out
515 // into a function to coalesce + set start times and reuse.
516 pending_nudge_->mutable_session()->Coalesce(*(job->session()));
517 pending_nudge_->set_scheduled_start(job->scheduled_start());
rlarocque 2012/09/24 21:15:14 When a user is actively using their browser, I exp
tim (not reviewing) 2012/10/08 00:20:03 Hmm. Afaik, this set_scheduled_start call doesn't
518 return false;
519 }
530 520
531 SaveJob(job); 521 scoped_ptr<SyncSessionJob> job_to_save = job->CloneAndAbandon();
rlarocque 2012/09/24 21:15:14 This makes me think ShouldRunJobSaveIfNecessary()
tim (not reviewing) 2012/10/08 00:20:03 Well, note the intended contract here is simpler t
522 if (wait_interval_.get() && !wait_interval_->pending_configure_job) {
523 // This job should be made the new canary.
524 if (is_nudge) {
525 pending_nudge_ = job_to_save.get();
526 } else {
527 SDVLOG(2) << "Saving a configuration job";
528 DCHECK_EQ(job->purpose(), SyncSessionJob::CONFIGURATION);
529 DCHECK(!wait_interval_->pending_configure_job);
530 DCHECK_EQ(mode_, CONFIGURATION_MODE);
531 DCHECK(!job->config_params().ready_task.is_null());
532 // The only nudge that could exist is a scheduled canary nudge.
533 DCHECK(!unscheduled_nudge_storage_.get());
534 if (pending_nudge_) {
535 // Pre-empt the nudge canary and abandon the old nudge (owned by task).
536 unscheduled_nudge_storage_ = pending_nudge_->CloneAndAbandon();
rlarocque 2012/09/24 21:15:14 What effect do these two lines have?
tim (not reviewing) 2012/10/08 00:20:03 By "pre-empt" I mean cancel the pending nudge job
537 pending_nudge_ = unscheduled_nudge_storage_.get();
538 }
539 wait_interval_->pending_configure_job = job_to_save.get();
540 }
541 wait_interval_->length =
rlarocque 2012/09/24 21:15:14 Could this end up being zero or negative? What ef
tim (not reviewing) 2012/10/08 00:20:03 Hmph. Good catch - I had fixed this since I hit a
542 wait_interval_->timer.desired_run_time() - TimeTicks::Now();
543 RestartWaiting(job_to_save.Pass());
544 return false;
545 }
546
547 // Note that today there are no cases where we SAVE a CONFIGURATION job
548 // when we're not in a WaitInterval. See bug 147736.
549 DCHECK(is_nudge);
550 // There may or may not be a pending_configure_job. Either way this nudge
551 // is unschedulable.
552 pending_nudge_ = job_to_save.get();
553 unscheduled_nudge_storage_ = job_to_save.Pass();
532 return false; 554 return false;
533 } 555 }
534 556
535 void SyncSchedulerImpl::SaveJob(const SyncSessionJob& job) {
536 DCHECK_EQ(MessageLoop::current(), sync_loop_);
537 if (job.purpose == SyncSessionJob::NUDGE) {
538 SDVLOG(2) << "Saving a nudge job";
539 InitOrCoalescePendingJob(job);
540 } else if (job.purpose == SyncSessionJob::CONFIGURATION){
541 SDVLOG(2) << "Saving a configuration job";
542 DCHECK(wait_interval_.get());
543 DCHECK(mode_ == CONFIGURATION_MODE);
544
545 // Config params should always get set.
546 DCHECK(!job.config_params.ready_task.is_null());
547 SyncSession* old = job.session.get();
548 SyncSession* s(new SyncSession(session_context_, this, old->source(),
549 old->routing_info(), old->workers()));
550 SyncSessionJob new_job(job.purpose,
551 TimeTicks::Now(),
552 make_linked_ptr(s),
553 false,
554 job.config_params,
555 job.from_here);
556 wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job));
557 } // drop the rest.
558 // TODO(sync): Is it okay to drop the rest? It's weird that
559 // SaveJob() only does what it says sometimes. (See
560 // http://crbug.com/90868.)
561 }
562
563 // Functor for std::find_if to search by ModelSafeGroup. 557 // Functor for std::find_if to search by ModelSafeGroup.
564 struct ModelSafeWorkerGroupIs { 558 struct ModelSafeWorkerGroupIs {
565 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {} 559 explicit ModelSafeWorkerGroupIs(ModelSafeGroup group) : group(group) {}
566 bool operator()(ModelSafeWorker* w) { 560 bool operator()(ModelSafeWorker* w) {
567 return group == w->GetModelSafeGroup(); 561 return group == w->GetModelSafeGroup();
568 } 562 }
569 ModelSafeGroup group; 563 ModelSafeGroup group;
570 }; 564 };
571 565
572 void SyncSchedulerImpl::ScheduleNudgeAsync( 566 void SyncSchedulerImpl::ScheduleNudgeAsync(
573 const TimeDelta& delay, 567 const TimeDelta& delay,
574 NudgeSource source, ModelTypeSet types, 568 NudgeSource source, ModelTypeSet types,
575 const tracked_objects::Location& nudge_location) { 569 const tracked_objects::Location& nudge_location) {
576 DCHECK_EQ(MessageLoop::current(), sync_loop_); 570 DCHECK_EQ(MessageLoop::current(), sync_loop_);
577 SDVLOG_LOC(nudge_location, 2) 571 SDVLOG_LOC(nudge_location, 2)
578 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " 572 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, "
579 << "source " << GetNudgeSourceString(source) << ", " 573 << "source " << GetNudgeSourceString(source) << ", "
580 << "types " << ModelTypeSetToString(types); 574 << "types " << ModelTypeSetToString(types);
581 575
582 ModelTypeStateMap type_state_map = 576 ModelTypeStateMap type_state_map =
583 ModelTypeSetToStateMap(types, std::string()); 577 ModelTypeSetToStateMap(types, std::string());
584 SyncSchedulerImpl::ScheduleNudgeImpl(delay, 578 SyncSchedulerImpl::ScheduleNudgeImpl(delay,
585 GetUpdatesFromNudgeSource(source), 579 GetUpdatesFromNudgeSource(source),
586 type_state_map, 580 type_state_map,
587 false,
588 nudge_location); 581 nudge_location);
589 } 582 }
590 583
591 void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync( 584 void SyncSchedulerImpl::ScheduleNudgeWithStatesAsync(
592 const TimeDelta& delay, 585 const TimeDelta& delay,
593 NudgeSource source, const ModelTypeStateMap& type_state_map, 586 NudgeSource source, const ModelTypeStateMap& type_state_map,
594 const tracked_objects::Location& nudge_location) { 587 const tracked_objects::Location& nudge_location) {
595 DCHECK_EQ(MessageLoop::current(), sync_loop_); 588 DCHECK_EQ(MessageLoop::current(), sync_loop_);
596 SDVLOG_LOC(nudge_location, 2) 589 SDVLOG_LOC(nudge_location, 2)
597 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, " 590 << "Nudge scheduled with delay " << delay.InMilliseconds() << " ms, "
598 << "source " << GetNudgeSourceString(source) << ", " 591 << "source " << GetNudgeSourceString(source) << ", "
599 << "payloads " 592 << "payloads "
600 << ModelTypeStateMapToString(type_state_map); 593 << ModelTypeStateMapToString(type_state_map);
601 594
602 SyncSchedulerImpl::ScheduleNudgeImpl(delay, 595 SyncSchedulerImpl::ScheduleNudgeImpl(delay,
603 GetUpdatesFromNudgeSource(source), 596 GetUpdatesFromNudgeSource(source),
604 type_state_map, 597 type_state_map,
605 false,
606 nudge_location); 598 nudge_location);
607 } 599 }
608 600
609 void SyncSchedulerImpl::ScheduleNudgeImpl( 601 void SyncSchedulerImpl::ScheduleNudgeImpl(
610 const TimeDelta& delay, 602 const TimeDelta& delay,
611 GetUpdatesCallerInfo::GetUpdatesSource source, 603 GetUpdatesCallerInfo::GetUpdatesSource source,
612 const ModelTypeStateMap& type_state_map, 604 const ModelTypeStateMap& type_state_map,
613 bool is_canary_job, const tracked_objects::Location& nudge_location) { 605 const tracked_objects::Location& nudge_location) {
614 DCHECK_EQ(MessageLoop::current(), sync_loop_); 606 DCHECK_EQ(MessageLoop::current(), sync_loop_);
615 DCHECK(!type_state_map.empty()) << "Nudge scheduled for no types!"; 607 DCHECK(!type_state_map.empty()) << "Nudge scheduled for no types!";
616 608
617 SDVLOG_LOC(nudge_location, 2) 609 SDVLOG_LOC(nudge_location, 2)
618 << "In ScheduleNudgeImpl with delay " 610 << "In ScheduleNudgeImpl with delay "
619 << delay.InMilliseconds() << " ms, " 611 << delay.InMilliseconds() << " ms, "
620 << "source " << GetUpdatesSourceString(source) << ", " 612 << "source " << GetUpdatesSourceString(source) << ", "
621 << "payloads " 613 << "payloads "
622 << ModelTypeStateMapToString(type_state_map) 614 << ModelTypeStateMapToString(type_state_map);
623 << (is_canary_job ? " (canary)" : "");
624 615
625 SyncSourceInfo info(source, type_state_map); 616 SyncSourceInfo info(source, type_state_map);
626 UpdateNudgeTimeRecords(info); 617 UpdateNudgeTimeRecords(info);
627 618
628 SyncSession* session(CreateSyncSession(info)); 619 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(
629 SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, 620 SyncSessionJob::NUDGE,
630 make_linked_ptr(session), is_canary_job, 621 TimeTicks::Now() + delay,
631 ConfigurationParams(), nudge_location); 622 CreateSyncSession(info).Pass(),
623 ConfigurationParams(),
624 nudge_location));
632 625
633 session = NULL; 626 if (!ShouldRunJobSaveIfNecessary(job.get()))
634 if (!ShouldRunJob(job))
635 return; 627 return;
636 628
637 if (pending_nudge_.get()) { 629 if (pending_nudge_) {
638 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) {
639 SDVLOG(2) << "Dropping the nudge because we are in backoff";
640 return;
641 }
642
643 SDVLOG(2) << "Coalescing pending nudge";
644 pending_nudge_->session->Coalesce(*(job.session.get()));
645
646 SDVLOG(2) << "Rescheduling pending nudge"; 630 SDVLOG(2) << "Rescheduling pending nudge";
647 SyncSession* s = pending_nudge_->session.get(); 631 pending_nudge_->mutable_session()->Coalesce(*(job->session()));
648 job.session.reset(new SyncSession(s->context(), s->delegate(), 632 // Choose the start time as the earliest of the 2. Note that this means
649 s->source(), s->routing_info(), s->workers())); 633 // if a nudge arrives with delay (e.g. kDefaultSessionsCommitDelaySeconds)
650 634 // but a nudge is already scheduled to go out, we'll send the (tab) commit
651 // Choose the start time as the earliest of the 2. 635 // without waiting.
652 job.scheduled_start = std::min(job.scheduled_start, 636 pending_nudge_->set_scheduled_start(
653 pending_nudge_->scheduled_start); 637 std::min(job->scheduled_start(), pending_nudge_->scheduled_start()));
654 pending_nudge_.reset(); 638 // Abandon the old task by cloning and replacing the session.
639 // It's possible that by "rescheduling" we're actually taking a job that
640 // was previously unscheduled and giving it wings, so take care to reset
641 // unscheduled nudge storage.
642 job = pending_nudge_->CloneAndAbandon();
643 unscheduled_nudge_storage_.reset();
644 pending_nudge_ = NULL;
655 } 645 }
656 646
657 // TODO(zea): Consider adding separate throttling/backoff for datatype 647 // TODO(zea): Consider adding separate throttling/backoff for datatype
658 // refresh requests. 648 // refresh requests.
659 ScheduleSyncSessionJob(job); 649 ScheduleSyncSessionJob(job.Pass());
660 } 650 }
661 651
662 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) { 652 const char* SyncSchedulerImpl::GetModeString(SyncScheduler::Mode mode) {
663 switch (mode) { 653 switch (mode) {
664 ENUM_CASE(CONFIGURATION_MODE); 654 ENUM_CASE(CONFIGURATION_MODE);
665 ENUM_CASE(NORMAL_MODE); 655 ENUM_CASE(NORMAL_MODE);
666 } 656 }
667 return ""; 657 return "";
668 } 658 }
669 659
670 const char* SyncSchedulerImpl::GetDecisionString( 660 const char* SyncSchedulerImpl::GetDecisionString(
671 SyncSchedulerImpl::JobProcessDecision mode) { 661 SyncSchedulerImpl::JobProcessDecision mode) {
672 switch (mode) { 662 switch (mode) {
673 ENUM_CASE(CONTINUE); 663 ENUM_CASE(CONTINUE);
674 ENUM_CASE(SAVE); 664 ENUM_CASE(SAVE);
675 ENUM_CASE(DROP); 665 ENUM_CASE(DROP);
676 } 666 }
677 return ""; 667 return "";
678 } 668 }
679 669
680 // static
681 void SyncSchedulerImpl::SetSyncerStepsForPurpose(
682 SyncSessionJob::SyncSessionJobPurpose purpose,
683 SyncerStep* start,
684 SyncerStep* end) {
685 switch (purpose) {
686 case SyncSessionJob::CONFIGURATION:
687 *start = DOWNLOAD_UPDATES;
688 *end = APPLY_UPDATES;
689 return;
690 case SyncSessionJob::NUDGE:
691 case SyncSessionJob::POLL:
692 *start = SYNCER_BEGIN;
693 *end = SYNCER_END;
694 return;
695 default:
696 NOTREACHED();
697 *start = SYNCER_END;
698 *end = SYNCER_END;
699 return;
700 }
701 }
702
703 void SyncSchedulerImpl::PostTask( 670 void SyncSchedulerImpl::PostTask(
704 const tracked_objects::Location& from_here, 671 const tracked_objects::Location& from_here,
705 const char* name, const base::Closure& task) { 672 const char* name, const base::Closure& task) {
706 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task"; 673 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task";
707 DCHECK_EQ(MessageLoop::current(), sync_loop_); 674 DCHECK_EQ(MessageLoop::current(), sync_loop_);
708 if (!started_) { 675 if (!started_) {
709 SDVLOG(1) << "Not posting task as scheduler is stopped."; 676 SDVLOG(1) << "Not posting task as scheduler is stopped.";
710 return; 677 return;
711 } 678 }
712 sync_loop_->PostTask(from_here, task); 679 sync_loop_->PostTask(from_here, task);
713 } 680 }
714 681
715 void SyncSchedulerImpl::PostDelayedTask( 682 void SyncSchedulerImpl::PostDelayedTask(
716 const tracked_objects::Location& from_here, 683 const tracked_objects::Location& from_here,
717 const char* name, const base::Closure& task, base::TimeDelta delay) { 684 const char* name, const base::Closure& task, base::TimeDelta delay) {
718 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with " 685 SDVLOG_LOC(from_here, 3) << "Posting " << name << " task with "
719 << delay.InMilliseconds() << " ms delay"; 686 << delay.InMilliseconds() << " ms delay";
720 DCHECK_EQ(MessageLoop::current(), sync_loop_); 687 DCHECK_EQ(MessageLoop::current(), sync_loop_);
721 if (!started_) { 688 if (!started_) {
722 SDVLOG(1) << "Not posting task as scheduler is stopped."; 689 SDVLOG(1) << "Not posting task as scheduler is stopped.";
723 return; 690 return;
724 } 691 }
725 sync_loop_->PostDelayedTask(from_here, task, delay); 692 sync_loop_->PostDelayedTask(from_here, task, delay);
726 } 693 }
727 694
728 void SyncSchedulerImpl::ScheduleSyncSessionJob(const SyncSessionJob& job) { 695 void SyncSchedulerImpl::ScheduleSyncSessionJob(
696 scoped_ptr<SyncSessionJob> job) {
729 DCHECK_EQ(MessageLoop::current(), sync_loop_); 697 DCHECK_EQ(MessageLoop::current(), sync_loop_);
730 if (no_scheduling_allowed_) { 698 if (no_scheduling_allowed_) {
731 NOTREACHED() << "Illegal to schedule job while session in progress."; 699 NOTREACHED() << "Illegal to schedule job while session in progress.";
732 return; 700 return;
733 } 701 }
734 702
735 TimeDelta delay = job.scheduled_start - TimeTicks::Now(); 703 TimeDelta delay = job->scheduled_start() - TimeTicks::Now();
736 if (delay < TimeDelta::FromMilliseconds(0)) 704 if (delay < TimeDelta::FromMilliseconds(0))
737 delay = TimeDelta::FromMilliseconds(0); 705 delay = TimeDelta::FromMilliseconds(0);
738 SDVLOG_LOC(job.from_here, 2) 706 SDVLOG_LOC(job->from_location(), 2)
739 << "In ScheduleSyncSessionJob with " 707 << "In ScheduleSyncSessionJob with "
740 << SyncSessionJob::GetPurposeString(job.purpose) 708 << SyncSessionJob::GetPurposeString(job->purpose())
741 << " job and " << delay.InMilliseconds() << " ms delay"; 709 << " job and " << delay.InMilliseconds() << " ms delay";
742 710
743 DCHECK(job.purpose == SyncSessionJob::NUDGE || 711 DCHECK(job->purpose() == SyncSessionJob::NUDGE ||
744 job.purpose == SyncSessionJob::POLL); 712 job->purpose() == SyncSessionJob::POLL);
745 if (job.purpose == SyncSessionJob::NUDGE) { 713 if (job->purpose() == SyncSessionJob::NUDGE) {
746 SDVLOG_LOC(job.from_here, 2) << "Resetting pending_nudge"; 714 SDVLOG_LOC(job->from_location(), 2) << "Resetting pending_nudge to ";
747 DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == 715 DCHECK(!pending_nudge_ || pending_nudge_->session() ==
748 job.session); 716 job->session());
749 pending_nudge_.reset(new SyncSessionJob(job)); 717 pending_nudge_ = job.get();
750 } 718 }
751 PostDelayedTask(job.from_here, "DoSyncSessionJob", 719
752 base::Bind(&SyncSchedulerImpl::DoSyncSessionJob, 720 tracked_objects::Location loc(job->from_location());
753 weak_ptr_factory_.GetWeakPtr(), 721 PostDelayedTask(loc, "DoSyncSessionJob",
akalin 2012/09/25 22:37:24 why not just inline job->from_location() here? Or
tim (not reviewing) 2012/10/08 00:20:03 :( Because base::Passed takes ownership of the job
754 job), 722 base::Bind(base::IgnoreResult(&SyncSchedulerImpl::DoSyncSessionJob),
755 delay); 723 weak_ptr_factory_.GetWeakPtr(),
724 base::Passed(&job)),
725 delay);
756 } 726 }
757 727
758 void SyncSchedulerImpl::DoSyncSessionJob(const SyncSessionJob& job) { 728 bool SyncSchedulerImpl::DoSyncSessionJob(scoped_ptr<SyncSessionJob> job) {
759 DCHECK_EQ(MessageLoop::current(), sync_loop_); 729 DCHECK_EQ(MessageLoop::current(), sync_loop_);
730 if (job->purpose() == SyncSessionJob::NUDGE) {
rlarocque 2012/09/24 21:15:14 I get the feeling that there's not much point in s
tim (not reviewing) 2012/10/08 00:20:03 There's still a chunk of common code, but also, fo
rlarocque 2012/10/08 19:18:04 I see your point, but I think the call sites shoul
tim (not reviewing) 2012/10/11 17:35:14 I'm not convinced we should do this yet. I still
rlarocque 2012/10/11 18:15:08 I think it would be easier to define a function "D
731 if (pending_nudge_ == NULL ||
732 pending_nudge_->session() != job->session()) {
733 // |job| is abandoned.
734 SDVLOG(2) << "Dropping a nudge in "
735 << "DoSyncSessionJob because another nudge was scheduled";
736 return false;
737 }
738 pending_nudge_ = NULL;
739
740 // Rebase the session with the latest model safe table and use it to purge
741 // and update any disabled or modified entries in the job.
742 job->mutable_session()->RebaseRoutingInfoWithLatest(
743 session_context_->routing_info(), session_context_->workers());
744 }
760 745
761 AutoReset<bool> protector(&no_scheduling_allowed_, true); 746 AutoReset<bool> protector(&no_scheduling_allowed_, true);
762 if (!ShouldRunJob(job)) { 747 GetUpdatesCallerInfo::GetUpdatesSource source(
748 job->session()->source().updates_source);
749 if (!ShouldRunJobSaveIfNecessary(job.get())) {
763 SLOG(WARNING) 750 SLOG(WARNING)
764 << "Not executing " 751 << "Not executing "
765 << SyncSessionJob::GetPurposeString(job.purpose) << " job from " 752 << SyncSessionJob::GetPurposeString(job->purpose()) << " job from "
766 << GetUpdatesSourceString(job.session->source().updates_source); 753 << GetUpdatesSourceString(source);
767 return; 754 return false;
768 } 755 }
769 756
770 if (job.purpose == SyncSessionJob::NUDGE) {
771 if (pending_nudge_.get() == NULL ||
772 pending_nudge_->session != job.session) {
773 SDVLOG(2) << "Dropping a nudge in "
774 << "DoSyncSessionJob because another nudge was scheduled";
775 return; // Another nudge must have been scheduled in in the meantime.
776 }
777 pending_nudge_.reset();
778
779 // Create the session with the latest model safe table and use it to purge
780 // and update any disabled or modified entries in the job.
781 scoped_ptr<SyncSession> session(CreateSyncSession(job.session->source()));
782
783 job.session->RebaseRoutingInfoWithLatest(*session);
784 }
785 SDVLOG(2) << "DoSyncSessionJob with " 757 SDVLOG(2) << "DoSyncSessionJob with "
786 << SyncSessionJob::GetPurposeString(job.purpose) << " job"; 758 << SyncSessionJob::GetPurposeString(job->purpose()) << " job";
787
788 SyncerStep begin(SYNCER_END);
789 SyncerStep end(SYNCER_END);
790 SetSyncerStepsForPurpose(job.purpose, &begin, &end);
791 759
792 bool has_more_to_sync = true; 760 bool has_more_to_sync = true;
793 while (ShouldRunJob(job) && has_more_to_sync) { 761 bool premature_exit = false;
762 while (DecideOnJob(job.get()) == CONTINUE && has_more_to_sync) {
794 SDVLOG(2) << "Calling SyncShare."; 763 SDVLOG(2) << "Calling SyncShare.";
795 // Synchronously perform the sync session from this thread. 764 // Synchronously perform the sync session from this thread.
796 syncer_->SyncShare(job.session.get(), begin, end); 765 premature_exit = !syncer_->SyncShare(job->mutable_session(),
797 has_more_to_sync = job.session->HasMoreToSync(); 766 job->start_step(),
767 job->end_step());
768
769 has_more_to_sync = job->session()->HasMoreToSync();
798 if (has_more_to_sync) 770 if (has_more_to_sync)
799 job.session->PrepareForAnotherSyncCycle(); 771 job->mutable_session()->PrepareForAnotherSyncCycle();
800 } 772 }
801 SDVLOG(2) << "Done SyncShare looping."; 773 SDVLOG(2) << "Done SyncShare looping.";
802 774
803 FinishSyncSessionJob(job); 775 FinishSyncSessionJob(job.get(), premature_exit);
akalin 2012/10/03 00:11:34 can you make SyncSessionJob::Finish() return the v
tim (not reviewing) 2012/10/08 00:20:03 Yeah, that works! Done.
776 return job->Succeeded();
804 } 777 }
805 778
806 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) { 779 void SyncSchedulerImpl::UpdateNudgeTimeRecords(const SyncSourceInfo& info) {
807 DCHECK_EQ(MessageLoop::current(), sync_loop_); 780 DCHECK_EQ(MessageLoop::current(), sync_loop_);
808 781
809 // We are interested in recording time between local nudges for datatypes. 782 // We are interested in recording time between local nudges for datatypes.
810 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well. 783 // TODO(tim): Consider tracking LOCAL_NOTIFICATION as well.
811 if (info.updates_source != GetUpdatesCallerInfo::LOCAL) 784 if (info.updates_source != GetUpdatesCallerInfo::LOCAL)
812 return; 785 return;
813 786
814 base::TimeTicks now = TimeTicks::Now(); 787 base::TimeTicks now = TimeTicks::Now();
815 // Update timing information for how often datatypes are triggering nudges. 788 // Update timing information for how often datatypes are triggering nudges.
816 for (ModelTypeStateMap::const_iterator iter = info.types.begin(); 789 for (ModelTypeStateMap::const_iterator iter = info.types.begin();
817 iter != info.types.end(); 790 iter != info.types.end();
818 ++iter) { 791 ++iter) {
819 base::TimeTicks previous = last_local_nudges_by_model_type_[iter->first]; 792 base::TimeTicks previous = last_local_nudges_by_model_type_[iter->first];
820 last_local_nudges_by_model_type_[iter->first] = now; 793 last_local_nudges_by_model_type_[iter->first] = now;
821 if (previous.is_null()) 794 if (previous.is_null())
822 continue; 795 continue;
823 796
824 #define PER_DATA_TYPE_MACRO(type_str) \ 797 #define PER_DATA_TYPE_MACRO(type_str) \
825 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous); 798 SYNC_FREQ_HISTOGRAM("Sync.Freq" type_str, now - previous);
826 SYNC_DATA_TYPE_HISTOGRAM(iter->first); 799 SYNC_DATA_TYPE_HISTOGRAM(iter->first);
827 #undef PER_DATA_TYPE_MACRO 800 #undef PER_DATA_TYPE_MACRO
828 } 801 }
829 } 802 }
830 803
831 void SyncSchedulerImpl::FinishSyncSessionJob(const SyncSessionJob& job) { 804 void SyncSchedulerImpl::FinishSyncSessionJob(SyncSessionJob* job,
805 bool exited_prematurely) {
832 DCHECK_EQ(MessageLoop::current(), sync_loop_); 806 DCHECK_EQ(MessageLoop::current(), sync_loop_);
833 // Now update the status of the connection from SCM. We need this to decide 807 // Now update the status of the connection from SCM. We need this to decide
834 // whether we need to save/run future jobs. The notifications from SCM are not 808 // whether we need to save/run future jobs. The notifications from SCM are
835 // reliable. 809 // not reliable.
836 // 810 //
837 // TODO(rlarocque): crbug.com/110954 811 // TODO(rlarocque): crbug.com/110954
838 // We should get rid of the notifications and it is probably not needed to 812 // We should get rid of the notifications and it is probably not needed to
839 // maintain this status variable in 2 places. We should query it directly from 813 // maintain this status variable in 2 places. We should query it directly
840 // SCM when needed. 814 // from SCM when needed.
841 ServerConnectionManager* scm = session_context_->connection_manager(); 815 ServerConnectionManager* scm = session_context_->connection_manager();
842 UpdateServerConnectionManagerStatus(scm->server_status()); 816 UpdateServerConnectionManagerStatus(scm->server_status());
843 817
844 if (IsSyncingCurrentlySilenced()) { 818 if (IsSyncingCurrentlySilenced()) {
845 SDVLOG(2) << "We are currently throttled; not scheduling the next sync."; 819 SDVLOG(2) << "We are currently throttled; scheduling Unthrottle.";
846 // TODO(sync): Investigate whether we need to check job.purpose 820 // If we're here, it's because |job| was silenced until a server specified
847 // here; see DCHECKs in SaveJob(). (See http://crbug.com/90868.) 821 // time. (Note, it had to be |job|, because DecideOnJob would not permit
848 SaveJob(job); 822 // any job through while in WaitInterval::THROTTLED).
849 return; // Nothing to do. 823 scoped_ptr<SyncSessionJob> clone = job->CloneAndAbandon();
akalin 2012/10/03 00:11:34 Assume you implement my comment above re. passing
tim (not reviewing) 2012/10/08 00:20:03 Interesting point. In general the logic is if you
850 } else if (job.session->Succeeded() && 824 if (job->purpose() == SyncSessionJob::NUDGE)
851 !job.config_params.ready_task.is_null()) { 825 pending_nudge_ = clone.get();
852 // If this was a configuration job with a ready task, invoke it now that 826 else if (job->purpose() == SyncSessionJob::CONFIGURATION)
853 // we finished successfully. 827 wait_interval_->pending_configure_job = clone.get();
828 else
829 clone.reset(); // Unthrottling is enough, no need to force a canary.
830
831 RestartWaiting(clone.Pass());
832 return;
833 }
834
835 // Let job know that we're through syncing (calling SyncShare) at this point.
836 {
854 AutoReset<bool> protector(&no_scheduling_allowed_, true); 837 AutoReset<bool> protector(&no_scheduling_allowed_, true);
855 job.config_params.ready_task.Run(); 838 job->Finish(exited_prematurely);
856 } 839 }
857 840
858 SDVLOG(2) << "Updating the next polling time after SyncMain"; 841 SDVLOG(2) << "Updating the next polling time after SyncMain";
859 ScheduleNextSync(job); 842 ScheduleNextSync(job);
860 } 843 }
861 844
862 void SyncSchedulerImpl::ScheduleNextSync(const SyncSessionJob& old_job) { 845 void SyncSchedulerImpl::ScheduleNextSync(const SyncSessionJob* old_job) {
863 DCHECK_EQ(MessageLoop::current(), sync_loop_); 846 DCHECK_EQ(MessageLoop::current(), sync_loop_);
864 DCHECK(!old_job.session->HasMoreToSync()); 847 DCHECK(!old_job->session()->HasMoreToSync());
865 848
866 AdjustPolling(&old_job); 849 AdjustPolling(old_job);
867 850
868 if (old_job.session->Succeeded()) { 851 if (old_job->Succeeded()) {
869 // Only reset backoff if we actually reached the server. 852 // Only reset backoff if we actually reached the server.
870 if (old_job.session->SuccessfullyReachedServer()) 853 // It's possible that we reached the server on one attempt, then had an
854 // error on the next (or didn't perform some of the server-communicating
855 // commands). We want to verify that, for all commands attempted, we
856 // successfully spoke with the server. Therefore, we verify no errors
857 // and at least one SYNCER_OK.
858 if (old_job->session()->DidReachServer())
871 wait_interval_.reset(); 859 wait_interval_.reset();
872 SDVLOG(2) << "Job succeeded so not scheduling more jobs"; 860 SDVLOG(2) << "Job succeeded so not scheduling more jobs";
873 return; 861 return;
874 } 862 }
875 863
876 if (old_job.purpose == SyncSessionJob::POLL) { 864 if (old_job->purpose() == SyncSessionJob::POLL) {
877 return; // We don't retry POLL jobs. 865 return; // We don't retry POLL jobs.
878 } 866 }
879 867
880 // TODO(rlarocque): There's no reason why we should blindly backoff and retry 868 // TODO(rlarocque): There's no reason why we should blindly backoff and retry
881 // if we don't succeed. Some types of errors are not likely to disappear on 869 // if we don't succeed. Some types of errors are not likely to disappear on
882 // their own. With the return values now available in the old_job.session, we 870 // their own. With the return values now available in the old_job.session,
883 // should be able to detect such errors and only retry when we detect 871 // we should be able to detect such errors and only retry when we detect
884 // transient errors. 872 // transient errors.
885 873
886 if (IsBackingOff() && wait_interval_->timer.IsRunning() && 874 if (IsBackingOff() && wait_interval_->timer.IsRunning() &&
887 mode_ == NORMAL_MODE) { 875 mode_ == NORMAL_MODE) {
888 // When in normal mode, we allow up to one nudge per backoff interval. It 876 // When in normal mode, we allow up to one nudge per backoff interval. It
889 // appears that this was our nudge for this interval, and it failed. 877 // appears that this was our nudge for this interval, and it failed.
890 // 878 //
891 // Note: This does not prevent us from running canary jobs. For example, an 879 // Note: This does not prevent us from running canary jobs. For example,
892 // IP address change might still result in another nudge being executed 880 // an IP address change might still result in another nudge being executed
893 // during this backoff interval. 881 // during this backoff interval.
894 SDVLOG(2) << "A nudge during backoff failed"; 882 SDVLOG(2) << "A nudge during backoff failed, creating new pending nudge.";
895 883 DCHECK_EQ(SyncSessionJob::NUDGE, old_job->purpose());
896 DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose);
897 DCHECK(!wait_interval_->had_nudge); 884 DCHECK(!wait_interval_->had_nudge);
898 885
899 wait_interval_->had_nudge = true; 886 wait_interval_->had_nudge = true;
900 InitOrCoalescePendingJob(old_job); 887 DCHECK(!pending_nudge_);
901 RestartWaiting(); 888
889 scoped_ptr<SyncSessionJob> new_job = old_job->Clone();
akalin 2012/10/03 00:11:34 same question here -- can you pass ownership to th
tim (not reviewing) 2012/10/08 00:20:03 I did change this around a bit, see my response ab
890 pending_nudge_ = new_job.get();
891 RestartWaiting(new_job.Pass());
902 } else { 892 } else {
903 // Either this is the first failure or a consecutive failure after our 893 // Either this is the first failure or a consecutive failure after our
904 // backoff timer expired. We handle it the same way in either case. 894 // backoff timer expired. We handle it the same way in either case.
905 SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed"; 895 SDVLOG(2) << "Non-'backoff nudge' SyncShare job failed";
906 HandleContinuationError(old_job); 896 HandleContinuationError(old_job);
907 } 897 }
908 } 898 }
909 899
910 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) { 900 void SyncSchedulerImpl::AdjustPolling(const SyncSessionJob* old_job) {
911 DCHECK_EQ(MessageLoop::current(), sync_loop_); 901 DCHECK_EQ(MessageLoop::current(), sync_loop_);
912 902
913 TimeDelta poll = (!session_context_->notifications_enabled()) ? 903 TimeDelta poll = (!session_context_->notifications_enabled()) ?
914 syncer_short_poll_interval_seconds_ : 904 syncer_short_poll_interval_seconds_ :
915 syncer_long_poll_interval_seconds_; 905 syncer_long_poll_interval_seconds_;
916 bool rate_changed = !poll_timer_.IsRunning() || 906 bool rate_changed = !poll_timer_.IsRunning() ||
917 poll != poll_timer_.GetCurrentDelay(); 907 poll != poll_timer_.GetCurrentDelay();
918 908
919 if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed) 909 if (old_job && old_job->purpose() != SyncSessionJob::POLL && !rate_changed)
920 poll_timer_.Reset(); 910 poll_timer_.Reset();
921 911
922 if (!rate_changed) 912 if (!rate_changed)
923 return; 913 return;
924 914
925 // Adjust poll rate. 915 // Adjust poll rate.
926 poll_timer_.Stop(); 916 poll_timer_.Stop();
927 poll_timer_.Start(FROM_HERE, poll, this, 917 poll_timer_.Start(FROM_HERE, poll, this,
928 &SyncSchedulerImpl::PollTimerCallback); 918 &SyncSchedulerImpl::PollTimerCallback);
929 } 919 }
930 920
931 void SyncSchedulerImpl::RestartWaiting() { 921 void SyncSchedulerImpl::RestartWaiting(scoped_ptr<SyncSessionJob> job) {
932 CHECK(wait_interval_.get()); 922 CHECK(wait_interval_.get());
933 wait_interval_->timer.Stop(); 923 wait_interval_->timer.Stop();
934 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, 924 if (wait_interval_->mode == WaitInterval::THROTTLED) {
935 this, &SyncSchedulerImpl::DoCanaryJob); 925 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length,
926 base::Bind(&SyncSchedulerImpl::Unthrottle,
927 weak_ptr_factory_.GetWeakPtr(),
928 base::Passed(&job)));
929 } else {
930 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length,
931 base::Bind(&SyncSchedulerImpl::DoCanaryJob,
932 weak_ptr_factory_.GetWeakPtr(),
933 base::Passed(&job)));
934 }
936 } 935 }
937 936
938 void SyncSchedulerImpl::HandleContinuationError( 937 void SyncSchedulerImpl::HandleContinuationError(
939 const SyncSessionJob& old_job) { 938 const SyncSessionJob* old_job) {
akalin 2012/10/03 00:11:34 similar to the above, can you pass ownership to th
tim (not reviewing) 2012/10/08 00:20:03 I'm going with the logic that a job has a well def
940 DCHECK_EQ(MessageLoop::current(), sync_loop_); 939 DCHECK_EQ(MessageLoop::current(), sync_loop_);
941 if (DCHECK_IS_ON()) { 940 if (DCHECK_IS_ON()) {
942 if (IsBackingOff()) { 941 if (IsBackingOff()) {
943 DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job); 942 DCHECK(wait_interval_->timer.IsRunning() || old_job->is_canary());
944 } 943 }
945 } 944 }
946 945
947 TimeDelta length = delay_provider_->GetDelay( 946 TimeDelta length = delay_provider_->GetDelay(
948 IsBackingOff() ? wait_interval_->length : 947 IsBackingOff() ? wait_interval_->length :
949 delay_provider_->GetInitialDelay( 948 delay_provider_->GetInitialDelay(
950 old_job.session->status_controller().model_neutral_state())); 949 old_job->session()->status_controller().model_neutral_state()));
951 950
952 SDVLOG(2) << "In handle continuation error with " 951 SDVLOG(2) << "In handle continuation error with "
953 << SyncSessionJob::GetPurposeString(old_job.purpose) 952 << SyncSessionJob::GetPurposeString(old_job->purpose())
954 << " job. The time delta(ms) is " 953 << " job. The time delta(ms) is "
955 << length.InMilliseconds(); 954 << length.InMilliseconds();
956 955
957 // This will reset the had_nudge variable as well. 956 // This will reset the had_nudge variable as well.
958 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, 957 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF,
959 length)); 958 length));
960 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { 959 scoped_ptr<SyncSessionJob> new_job(old_job->CloneFromLocation(FROM_HERE));
960 new_job->set_scheduled_start(TimeTicks::Now() + length);
961 if (old_job->purpose() == SyncSessionJob::CONFIGURATION) {
961 SDVLOG(2) << "Configuration did not succeed, scheduling retry."; 962 SDVLOG(2) << "Configuration did not succeed, scheduling retry.";
962 // Config params should always get set. 963 // Config params should always get set.
963 DCHECK(!old_job.config_params.ready_task.is_null()); 964 DCHECK(!old_job->config_params().ready_task.is_null());
964 SyncSession* old = old_job.session.get(); 965 wait_interval_->pending_configure_job = new_job.get();
965 SyncSession* s(new SyncSession(session_context_, this,
966 old->source(), old->routing_info(), old->workers()));
967 SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length,
968 make_linked_ptr(s), false, old_job.config_params,
969 FROM_HERE);
970 wait_interval_->pending_configure_job.reset(new SyncSessionJob(job));
971 } else { 966 } else {
972 // We are not in configuration mode. So wait_interval's pending job 967 // We are not in configuration mode. So wait_interval's pending job
973 // should be null. 968 // should be null.
974 DCHECK(wait_interval_->pending_configure_job.get() == NULL); 969 DCHECK(wait_interval_->pending_configure_job == NULL);
970 DCHECK(!pending_nudge_);
971 pending_nudge_ = new_job.get();
972 }
975 973
976 // TODO(lipalani) - handle clear user data. 974 RestartWaiting(new_job.Pass());
977 InitOrCoalescePendingJob(old_job);
978 }
979 RestartWaiting();
980 } 975 }
981 976
982 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) { 977 void SyncSchedulerImpl::RequestStop(const base::Closure& callback) {
983 syncer_->RequestEarlyExit(); // Safe to call from any thread. 978 syncer_->RequestEarlyExit(); // Safe to call from any thread.
984 DCHECK(weak_handle_this_.IsInitialized()); 979 DCHECK(weak_handle_this_.IsInitialized());
985 SDVLOG(3) << "Posting StopImpl"; 980 SDVLOG(3) << "Posting StopImpl";
986 weak_handle_this_.Call(FROM_HERE, 981 weak_handle_this_.Call(FROM_HERE,
987 &SyncSchedulerImpl::StopImpl, 982 &SyncSchedulerImpl::StopImpl,
988 callback); 983 callback);
989 } 984 }
990 985
991 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) { 986 void SyncSchedulerImpl::StopImpl(const base::Closure& callback) {
992 DCHECK_EQ(MessageLoop::current(), sync_loop_); 987 DCHECK_EQ(MessageLoop::current(), sync_loop_);
993 SDVLOG(2) << "StopImpl called"; 988 SDVLOG(2) << "StopImpl called";
994 989
995 // Kill any in-flight method calls. 990 // Kill any in-flight method calls.
996 weak_ptr_factory_.InvalidateWeakPtrs(); 991 weak_ptr_factory_.InvalidateWeakPtrs();
997 wait_interval_.reset(); 992 wait_interval_.reset();
998 poll_timer_.Stop(); 993 poll_timer_.Stop();
999 if (started_) { 994 if (started_) {
1000 started_ = false; 995 started_ = false;
1001 } 996 }
1002 if (!callback.is_null()) 997 if (!callback.is_null())
1003 callback.Run(); 998 callback.Run();
1004 } 999 }
1005 1000
1006 void SyncSchedulerImpl::DoCanaryJob() { 1001 void SyncSchedulerImpl::DoCanaryJob(scoped_ptr<SyncSessionJob> to_be_canary) {
1007 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1002 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1008 SDVLOG(2) << "Do canary job"; 1003 SDVLOG(2) << "Do canary job";
1009 DoPendingJobIfPossible(true); 1004
1005 // Only set canary privlieges here, when we are about to run the job. This
akalin 2012/09/25 22:37:24 privileges
tim (not reviewing) 2012/10/08 00:20:03 Done.
1006 // avoids confusion in managing canary bits during scheduling, when you
1007 // consider that mode switches (e.g., to config) can "pre-empt" a NUDGE that
1008 // was scheduled as canary, and send it to an "unscheduled" state.
1009 to_be_canary->GrantCanaryPrivilege();
1010
1011 if (to_be_canary->purpose() == SyncSessionJob::NUDGE) {
1012 // TODO(tim): We should be able to remove this...
1013 scoped_ptr<SyncSession> temp = CreateSyncSession(
1014 to_be_canary->session()->source()).Pass();
1015 // The routing info might have been changed since we cached the
1016 // pending nudge. Update it by coalescing to the latest.
1017 to_be_canary->mutable_session()->Coalesce(*(temp));
1018 }
1019 DoSyncSessionJob(to_be_canary.Pass());
1010 } 1020 }
1011 1021
1012 void SyncSchedulerImpl::DoPendingJobIfPossible(bool is_canary_job) { 1022 scoped_ptr<SyncSessionJob> SyncSchedulerImpl::TakePendingJobForCurrentMode() {
1013 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1023 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1014 SyncSessionJob* job_to_execute = NULL; 1024 // If we find a scheduled pending_ job, abandon the old one and return a
1025 // a clone. If unscheduled, just hand over ownership.
1026 scoped_ptr<SyncSessionJob> candidate;
1015 if (mode_ == CONFIGURATION_MODE && wait_interval_.get() 1027 if (mode_ == CONFIGURATION_MODE && wait_interval_.get()
1016 && wait_interval_->pending_configure_job.get()) { 1028 && wait_interval_->pending_configure_job) {
1017 SDVLOG(2) << "Found pending configure job"; 1029 SDVLOG(2) << "Found pending configure job";
1018 job_to_execute = wait_interval_->pending_configure_job.get(); 1030 candidate =
1019 } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) { 1031 wait_interval_->pending_configure_job->CloneAndAbandon().Pass();
1032 wait_interval_->pending_configure_job = candidate.get();
1033 } else if (mode_ == NORMAL_MODE && pending_nudge_) {
1020 SDVLOG(2) << "Found pending nudge job"; 1034 SDVLOG(2) << "Found pending nudge job";
1021 1035 candidate = pending_nudge_->CloneAndAbandon();
1022 scoped_ptr<SyncSession> session(CreateSyncSession( 1036 pending_nudge_ = candidate.get();
1023 pending_nudge_->session->source())); 1037 unscheduled_nudge_storage_.reset();
1024
1025 // Also the routing info might have been changed since we cached the
1026 // pending nudge. Update it by coalescing to the latest.
1027 pending_nudge_->session->Coalesce(*(session.get()));
1028 // The pending nudge would be cleared in the DoSyncSessionJob function.
1029 job_to_execute = pending_nudge_.get();
1030 } 1038 }
1031 1039 return candidate.Pass();
1032 if (job_to_execute != NULL) {
1033 SDVLOG(2) << "Executing pending job";
1034 SyncSessionJob copy = *job_to_execute;
1035 copy.is_canary_job = is_canary_job;
1036 DoSyncSessionJob(copy);
1037 }
1038 } 1040 }
1039 1041
1040 SyncSession* SyncSchedulerImpl::CreateSyncSession( 1042 scoped_ptr<SyncSession> SyncSchedulerImpl::CreateSyncSession(
1041 const SyncSourceInfo& source) { 1043 const SyncSourceInfo& source) {
1042 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1044 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1043 DVLOG(2) << "Creating sync session with routes " 1045 DVLOG(2) << "Creating sync session with routes "
1044 << ModelSafeRoutingInfoToString(session_context_->routing_info()); 1046 << ModelSafeRoutingInfoToString(session_context_->routing_info());
1045 1047
1046 SyncSourceInfo info(source); 1048 SyncSourceInfo info(source);
1047 SyncSession* session(new SyncSession(session_context_, this, info, 1049 return scoped_ptr<SyncSession>(new SyncSession(session_context_, this, info,
1048 session_context_->routing_info(), session_context_->workers())); 1050 session_context_->routing_info(), session_context_->workers()));
1049
1050 return session;
1051 } 1051 }
1052 1052
1053 void SyncSchedulerImpl::PollTimerCallback() { 1053 void SyncSchedulerImpl::PollTimerCallback() {
1054 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1054 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1055 ModelSafeRoutingInfo r; 1055 ModelSafeRoutingInfo r;
1056 ModelTypeStateMap type_state_map = 1056 ModelTypeStateMap type_state_map =
1057 ModelSafeRoutingInfoToStateMap(r, std::string()); 1057 ModelSafeRoutingInfoToStateMap(r, std::string());
1058 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, type_state_map); 1058 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, type_state_map);
1059 SyncSession* s = CreateSyncSession(info); 1059 scoped_ptr<SyncSession> s(CreateSyncSession(info));
1060 1060 scoped_ptr<SyncSessionJob> job(new SyncSessionJob(SyncSessionJob::POLL,
1061 SyncSessionJob job(SyncSessionJob::POLL, TimeTicks::Now(), 1061 TimeTicks::Now(),
1062 make_linked_ptr(s), 1062 s.Pass(),
1063 false, 1063 ConfigurationParams(),
1064 ConfigurationParams(), 1064 FROM_HERE));
1065 FROM_HERE); 1065 ScheduleSyncSessionJob(job.Pass());
1066
1067 ScheduleSyncSessionJob(job);
1068 } 1066 }
1069 1067
1070 void SyncSchedulerImpl::Unthrottle() { 1068 void SyncSchedulerImpl::Unthrottle(scoped_ptr<SyncSessionJob> to_be_canary) {
1071 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1069 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1072 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); 1070 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
1073 SDVLOG(2) << "Unthrottled."; 1071 SDVLOG(2) << "Unthrottled " << (to_be_canary.get() ? "with " : "without ")
1074 DoCanaryJob(); 1072 << "canary.";
1073 if (to_be_canary.get())
1074 DoCanaryJob(to_be_canary.Pass());
1075
1076 // TODO(tim): ?! This must have been broken. The way DecideOnJob works today
rlarocque 2012/09/24 21:15:14 Agreed, but this comment won't make much sense oth
tim (not reviewing) 2012/10/08 00:20:03 Ah, right. I had left this as a comment for mysel
rlarocque 2012/10/08 19:18:04 On second thought, this function resets the wait_i
tim (not reviewing) 2012/10/11 17:35:14 You're right. It should mean we're good to go on t
1077 // canary privileges aren't enough to bypass a THROTTLED wait interval, which
1078 // would suggest we need to reset first (though trusting canary in Decide is
1079 // probably the "right" thing to do).
1075 wait_interval_.reset(); 1080 wait_interval_.reset();
1076 } 1081 }
1077 1082
1078 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) { 1083 void SyncSchedulerImpl::Notify(SyncEngineEvent::EventCause cause) {
1079 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1084 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1080 session_context_->NotifyListeners(SyncEngineEvent(cause)); 1085 session_context_->NotifyListeners(SyncEngineEvent(cause));
1081 } 1086 }
1082 1087
1083 bool SyncSchedulerImpl::IsBackingOff() const { 1088 bool SyncSchedulerImpl::IsBackingOff() const {
1084 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1089 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1085 return wait_interval_.get() && wait_interval_->mode == 1090 return wait_interval_.get() && wait_interval_->mode ==
1086 WaitInterval::EXPONENTIAL_BACKOFF; 1091 WaitInterval::EXPONENTIAL_BACKOFF;
1087 } 1092 }
1088 1093
1089 void SyncSchedulerImpl::OnSilencedUntil( 1094 void SyncSchedulerImpl::OnSilencedUntil(
1090 const base::TimeTicks& silenced_until) { 1095 const base::TimeTicks& silenced_until) {
1091 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1096 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1092 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, 1097 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED,
1093 silenced_until - TimeTicks::Now())); 1098 silenced_until - TimeTicks::Now()));
1094 wait_interval_->timer.Start(FROM_HERE, wait_interval_->length, this,
1095 &SyncSchedulerImpl::Unthrottle);
1096 } 1099 }
1097 1100
1098 bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() { 1101 bool SyncSchedulerImpl::IsSyncingCurrentlySilenced() {
1099 DCHECK_EQ(MessageLoop::current(), sync_loop_); 1102 DCHECK_EQ(MessageLoop::current(), sync_loop_);
1100 return wait_interval_.get() && wait_interval_->mode == 1103 return wait_interval_.get() && wait_interval_->mode ==
1101 WaitInterval::THROTTLED; 1104 WaitInterval::THROTTLED;
1102 } 1105 }
1103 1106
1104 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate( 1107 void SyncSchedulerImpl::OnReceivedShortPollIntervalUpdate(
1105 const base::TimeDelta& new_interval) { 1108 const base::TimeDelta& new_interval) {
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
1159 1162
1160 #undef SDVLOG_LOC 1163 #undef SDVLOG_LOC
1161 1164
1162 #undef SDVLOG 1165 #undef SDVLOG
1163 1166
1164 #undef SLOG 1167 #undef SLOG
1165 1168
1166 #undef ENUM_CASE 1169 #undef ENUM_CASE
1167 1170
1168 } // namespace syncer 1171 } // namespace syncer
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698