Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(174)

Side by Side Diff: chrome/browser/sync/engine/syncer_thread2.cc

Issue 6812004: sync: Make nudge + config jobs reliable in SyncerThread2 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Fix a bug. Created 9 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « chrome/browser/sync/engine/syncer_thread2.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "chrome/browser/sync/engine/syncer_thread2.h" 5 #include "chrome/browser/sync/engine/syncer_thread2.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 8
9 #include "base/rand_util.h" 9 #include "base/rand_util.h"
10 #include "chrome/browser/sync/engine/syncer.h" 10 #include "chrome/browser/sync/engine/syncer.h"
11 11
12 using base::TimeDelta; 12 using base::TimeDelta;
13 using base::TimeTicks; 13 using base::TimeTicks;
14 14
15 namespace browser_sync { 15 namespace browser_sync {
16 16
17 using sessions::SyncSession; 17 using sessions::SyncSession;
18 using sessions::SyncSessionSnapshot; 18 using sessions::SyncSessionSnapshot;
19 using sessions::SyncSourceInfo; 19 using sessions::SyncSourceInfo;
20 using syncable::ModelTypePayloadMap; 20 using syncable::ModelTypePayloadMap;
21 using syncable::ModelTypeBitSet; 21 using syncable::ModelTypeBitSet;
22 using sync_pb::GetUpdatesCallerInfo; 22 using sync_pb::GetUpdatesCallerInfo;
23 23
24 namespace s3 { 24 namespace s3 {
25 25
26 struct SyncerThread::SyncSessionJob {
27 SyncSessionJobPurpose purpose;
28 base::TimeTicks scheduled_start;
29 linked_ptr<sessions::SyncSession> session;
30
31 // This is the location the nudge came from. used for debugging purpose.
32 // In case of multiple nudges getting coalesced this stores the first nudge
33 // that came in.
34 tracked_objects::Location nudge_location;
35 };
36
26 struct SyncerThread::WaitInterval { 37 struct SyncerThread::WaitInterval {
27 enum Mode { 38 enum Mode {
28 // A wait interval whose duration has been affected by exponential 39 // A wait interval whose duration has been affected by exponential
29 // backoff. 40 // backoff.
30 // EXPONENTIAL_BACKOFF intervals are nudge-rate limited to 1 per interval. 41 // EXPONENTIAL_BACKOFF intervals are nudge-rate limited to 1 per interval.
31 EXPONENTIAL_BACKOFF, 42 EXPONENTIAL_BACKOFF,
32 // A server-initiated throttled interval. We do not allow any syncing 43 // A server-initiated throttled interval. We do not allow any syncing
33 // during such an interval. 44 // during such an interval.
34 THROTTLED, 45 THROTTLED,
35 }; 46 };
36 Mode mode; 47 Mode mode;
37 48
38 // This bool is set to true if we have observed a nudge during this 49 // This bool is set to true if we have observed a nudge during this
39 // interval and mode == EXPONENTIAL_BACKOFF. 50 // interval and mode == EXPONENTIAL_BACKOFF.
40 bool had_nudge; 51 bool had_nudge;
41 base::TimeDelta length; 52 base::TimeDelta length;
42 base::OneShotTimer<SyncerThread> timer; 53 base::OneShotTimer<SyncerThread> timer;
54 scoped_ptr<SyncSessionJob> pending_job;
43 WaitInterval(Mode mode, base::TimeDelta length); 55 WaitInterval(Mode mode, base::TimeDelta length);
44 }; 56 };
45 57
46 struct SyncerThread::SyncSessionJob {
47 SyncSessionJobPurpose purpose;
48 base::TimeTicks scheduled_start;
49 linked_ptr<sessions::SyncSession> session;
50
51 // This is the location the nudge came from. used for debugging purpose.
52 // In case of multiple nudges getting coalesced this stores the first nudge
53 // that came in.
54 tracked_objects::Location nudge_location;
55 };
56
57 SyncerThread::DelayProvider::DelayProvider() {} 58 SyncerThread::DelayProvider::DelayProvider() {}
58 SyncerThread::DelayProvider::~DelayProvider() {} 59 SyncerThread::DelayProvider::~DelayProvider() {}
59 60
60 TimeDelta SyncerThread::DelayProvider::GetDelay( 61 TimeDelta SyncerThread::DelayProvider::GetDelay(
61 const base::TimeDelta& last_delay) { 62 const base::TimeDelta& last_delay) {
62 return SyncerThread::GetRecommendedDelay(last_delay); 63 return SyncerThread::GetRecommendedDelay(last_delay);
63 } 64 }
64 65
65 SyncerThread::WaitInterval::WaitInterval(Mode mode, TimeDelta length) 66 SyncerThread::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
66 : mode(mode), had_nudge(false), length(length) { } 67 : mode(mode), had_nudge(false), length(length) { }
67 68
68 SyncerThread::SyncerThread(sessions::SyncSessionContext* context, 69 SyncerThread::SyncerThread(sessions::SyncSessionContext* context,
69 Syncer* syncer) 70 Syncer* syncer)
70 : thread_("SyncEngine_SyncerThread"), 71 : thread_("SyncEngine_SyncerThread"),
71 syncer_short_poll_interval_seconds_( 72 syncer_short_poll_interval_seconds_(
72 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), 73 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
73 syncer_long_poll_interval_seconds_( 74 syncer_long_poll_interval_seconds_(
74 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), 75 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
75 mode_(NORMAL_MODE), 76 mode_(NORMAL_MODE),
76 server_connection_ok_(false), 77 server_connection_ok_(false),
77 delay_provider_(new DelayProvider()), 78 delay_provider_(new DelayProvider()),
78 syncer_(syncer), 79 syncer_(syncer),
79 session_context_(context) { 80 session_context_(context),
81 saved_nudge_(false) {
80 } 82 }
81 83
82 SyncerThread::~SyncerThread() { 84 SyncerThread::~SyncerThread() {
83 DCHECK(!thread_.IsRunning()); 85 DCHECK(!thread_.IsRunning());
84 } 86 }
85 87
86 void SyncerThread::CheckServerConnectionManagerStatus( 88 void SyncerThread::CheckServerConnectionManagerStatus(
87 HttpResponse::ServerConnectionCode code) { 89 HttpResponse::ServerConnectionCode code) {
88 // Note, be careful when adding cases here because if the SyncerThread 90 // Note, be careful when adding cases here because if the SyncerThread
89 // thinks there is no valid connection as determined by this method, it 91 // thinks there is no valid connection as determined by this method, it
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
133 135
134 void SyncerThread::StartImpl(Mode mode, 136 void SyncerThread::StartImpl(Mode mode,
135 linked_ptr<ModeChangeCallback> callback) { 137 linked_ptr<ModeChangeCallback> callback) {
136 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 138 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
137 DCHECK(!session_context_->account_name().empty()); 139 DCHECK(!session_context_->account_name().empty());
138 DCHECK(syncer_.get()); 140 DCHECK(syncer_.get());
139 mode_ = mode; 141 mode_ = mode;
140 AdjustPolling(NULL); // Will kick start poll timer if needed. 142 AdjustPolling(NULL); // Will kick start poll timer if needed.
141 if (callback.get()) 143 if (callback.get())
142 callback->Run(); 144 callback->Run();
145
146 if (mode_ == NORMAL_MODE && saved_nudge_ == true) {
147 syncable::ModelTypePayloadMap map;
148 ScheduleNudgeImpl(TimeDelta::FromSeconds(0), NUDGE_SOURCE_NOTIFICATION,
149 map, FROM_HERE);
150 }
143 } 151 }
144 152
145 bool SyncerThread::ShouldRunJob(SyncSessionJobPurpose purpose, 153 SyncerThread::JobProcessDecision SyncerThread::ShouldRunNudgeJob(
146 const TimeTicks& scheduled_start) { 154 const base::TimeTicks& scheduled_start) {
147 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
148
149 // Check wait interval.
150 if (wait_interval_.get()) { 155 if (wait_interval_.get()) {
151 // TODO(tim): Consider different handling for CLEAR_USER_DATA (i.e. permit 156 // Means we are either in throttled or exponential back off.
152 // when throttled). 157 // However it is also possible that we are in exponential back off
153 if (wait_interval_->mode == WaitInterval::THROTTLED) 158 // and we are retrying.(In that case the timer ran out but wait_interval
154 return false; 159 // is not yet cleared.) wait_interval_.timer_running is false;
160 if (wait_interval_->mode == WaitInterval::THROTTLED) {
161 return SAVE;
162 }
155 163
156 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); 164 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF);
157 if ((purpose != NUDGE) || wait_interval_->had_nudge) 165 if (mode_ == CONFIGURATION) {
158 return false; 166 return SAVE;
167 }
168 // The mode is normal. We already had a nudge. No point retrying.
169 if (wait_interval_->had_nudge) {
170 return DROP;
171 }
172 // Either this is the first nudge or we are in exponential backoff
173 // and we are trying because our timer ran out.(In either case had_nudge
174 // is false)
175 return CONTINUE;
159 } 176 }
160 177
161 // Mode / purpose contract (See 'Mode' enum in header). Don't run jobs that 178 // We are not in any kind of backoff.
162 // were intended for a normal sync if we are in configuration mode, and vice 179 if (mode_ == CONFIGURATION) {
163 // versa. 180 return SAVE;
164 switch (mode_) { 181 }
165 case CONFIGURATION_MODE: 182 // Freshness condition
166 if (purpose != CONFIGURATION) 183 if (scheduled_start < last_sync_session_end_time_) {
167 return false; 184 return DROP;
168 break;
169 case NORMAL_MODE:
170 if (purpose == CONFIGURATION)
171 return false;
172 break;
173 default:
174 NOTREACHED() << "Unknown SyncerThread Mode: " << mode_;
175 return false;
176 } 185 }
177 186
178 // Continuation NUDGE tasks have priority over POLLs because they are the 187 return CONTINUE;
179 // only tasks that trigger exponential backoff, so this prevents them from 188 }
180 // being starved from running (e.g. due to a very, very low poll interval, 189
181 // such as 0ms). It's rare that this would ever matter in practice. 190 // Note: We should never be dropping a config request.
182 if (purpose == POLL && (pending_nudge_.get() && 191 SyncerThread::JobProcessDecision SyncerThread::ShouldRunConfigureJob() {
tim (not reviewing) 2011/04/07 06:13:07 wouldn't it result in a substantial amount less co
lipalani1 2011/04/07 18:35:45 The original function did its job well in terms of
183 pending_nudge_->session->source().updates_source == 192 if (wait_interval_.get()) {
184 GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION)) { 193 if (wait_interval_->mode == WaitInterval::THROTTLED) {
194 return SAVE;
195 }
196
197 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF);
198 DCHECK(mode_ == CONFIGURATION_MODE);
199 if (wait_interval_->timer.IsRunning() == true) {
200 return SAVE;
201 }
202
203 return CONTINUE;
204 }
205
206 // We are not in any kind of back off.
207 DCHECK(mode_ == CONFIGURATION_MODE);
208 return CONTINUE;
209 }
210
211 SyncerThread::JobProcessDecision SyncerThread::ShouldRunJobDefaultImpl() {
212 if (wait_interval_.get()) {
213 return DROP;
214 }
215
216 if (mode_ == CONFIGURATION) {
217 return DROP;
218 }
219
220 return CONTINUE;
221 }
222
223 SyncerThread::JobProcessDecision SyncerThread::ShouldRunJob(
224 SyncSessionJobPurpose purpose,
225 const TimeTicks& scheduled_start) {
226 if (purpose == NUDGE) {
227 return ShouldRunNudgeJob(scheduled_start);
228 } else if (purpose == CONFIGURATION) {
229 return ShouldRunConfigureJob();
230 } else {
231 return ShouldRunJobDefaultImpl();
232 }
233 }
234
235 bool SyncerThread::ProcessJob(const SyncSessionJob& job) {
tim (not reviewing) 2011/04/07 06:13:07 It's pretty confusing that we now have both Proces
lipalani1 2011/04/07 18:35:45 Will do. Regarding job creation it was also done
236 JobProcessDecision decision = ShouldRunJob(job.purpose, job.scheduled_start);
237 if (decision == DROP) {
185 return false; 238 return false;
186 } 239 }
187 240
188 // Freshness condition. 241 if (decision == CONTINUE) {
189 if (purpose == NUDGE && 242 return true;
190 (scheduled_start < last_sync_session_end_time_)) {
191 return false;
192 } 243 }
193 244
194 return server_connection_ok_; 245 DCHECK(job.purpose == NUDGE || job.purpose == CONFIGURATION);
246 if (job.purpose == NUDGE) {
247 saved_nudge_ = true;
248 } else {
249 DCHECK(wait_interval_.get());
250 DCHECK(mode_ == CONFIGURATION_MODE);
251
252 // Save off the nudge if we had one already stored.
253 if (wait_interval_->pending_job.get()) {
254 if (wait_interval_->pending_job->purpose == NUDGE) {
255 saved_nudge_ = true;
256 }
257 }
258 SyncSession* old = job.session.get();
259 SyncSession* s(new SyncSession(session_context_.get(), this,
260 old->source(), old->routing_info(), old->workers()));
261 SyncSessionJob new_job = {job.purpose, TimeTicks::Now(),
262 make_linked_ptr(s), job.nudge_location};
263 wait_interval_->pending_job.reset(new SyncSessionJob(new_job));
264 }
265 return false;
195 } 266 }
196 267
197 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( 268 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
198 NudgeSource source) { 269 NudgeSource source) {
199 switch (source) { 270 switch (source) {
200 case NUDGE_SOURCE_NOTIFICATION: 271 case NUDGE_SOURCE_NOTIFICATION:
201 return GetUpdatesCallerInfo::NOTIFICATION; 272 return GetUpdatesCallerInfo::NOTIFICATION;
202 case NUDGE_SOURCE_LOCAL: 273 case NUDGE_SOURCE_LOCAL:
203 return GetUpdatesCallerInfo::LOCAL; 274 return GetUpdatesCallerInfo::LOCAL;
204 case NUDGE_SOURCE_CONTINUATION: 275 case NUDGE_SOURCE_CONTINUATION:
(...skipping 58 matching lines...) Expand 10 before | Expand all | Expand 10 after
263 SyncSourceInfo(), ModelSafeRoutingInfo(), 334 SyncSourceInfo(), ModelSafeRoutingInfo(),
264 std::vector<ModelSafeWorker*>()); 335 std::vector<ModelSafeWorker*>());
265 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), CLEAR_USER_DATA, session, 336 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), CLEAR_USER_DATA, session,
266 FROM_HERE); 337 FROM_HERE);
267 } 338 }
268 339
269 void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay, 340 void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay,
270 NudgeSource source, const ModelTypePayloadMap& types_with_payloads, 341 NudgeSource source, const ModelTypePayloadMap& types_with_payloads,
271 const tracked_objects::Location& nudge_location) { 342 const tracked_objects::Location& nudge_location) {
272 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 343 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
273 TimeTicks rough_start = TimeTicks::Now() + delay;
274 if (!ShouldRunJob(NUDGE, rough_start)) {
275 LOG(WARNING) << "Dropping nudge at scheduling time, source = "
276 << source;
277 return;
278 }
279 344
280 // Note we currently nudge for all types regardless of the ones incurring 345 // Note we currently nudge for all types regardless of the ones incurring
281 // the nudge. Doing different would throw off some syncer commands like 346 // the nudge. Doing different would throw off some syncer commands like
282 // CleanupDisabledTypes. We may want to change this in the future. 347 // CleanupDisabledTypes. We may want to change this in the future.
283 ModelSafeRoutingInfo routes; 348 ModelSafeRoutingInfo routes;
284 std::vector<ModelSafeWorker*> workers; 349 std::vector<ModelSafeWorker*> workers;
285 session_context_->registrar()->GetModelSafeRoutingInfo(&routes); 350 session_context_->registrar()->GetModelSafeRoutingInfo(&routes);
286 session_context_->registrar()->GetWorkers(&workers); 351 session_context_->registrar()->GetWorkers(&workers);
287 SyncSourceInfo info(GetUpdatesFromNudgeSource(source), 352 SyncSourceInfo info(GetUpdatesFromNudgeSource(source),
288 types_with_payloads); 353 types_with_payloads);
289 354
290 scoped_ptr<SyncSession> session(new SyncSession( 355 SyncSession* session(new SyncSession(
291 session_context_.get(), this, info, routes, workers)); 356 session_context_.get(), this, info, routes, workers));
292 357
358 SyncSessionJob job = {NUDGE, TimeTicks::Now() + delay,
359 make_linked_ptr(session), nudge_location};
360
361 session = NULL;
362 if (!ProcessJob(job)) {
tim (not reviewing) 2011/04/07 06:13:07 We only had to move this down because we must pass
lipalani1 2011/04/07 18:35:45 The other option as I mentioned was to pass only t
363 return;
364 }
365
293 if (pending_nudge_.get()) { 366 if (pending_nudge_.get()) {
294 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) 367 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1))
295 return; 368 return;
296 369
297 pending_nudge_->session->Coalesce(*session.get()); 370 pending_nudge_->session->Coalesce(*(job.session.get()));
298 371
299 if (!IsBackingOff()) { 372 if (!IsBackingOff()) {
300 return; 373 return;
301 } else { 374 } else {
302 // Re-schedule the current pending nudge. 375 // Re-schedule the current pending nudge.
303 SyncSession* s = pending_nudge_->session.get(); 376 SyncSession* s = pending_nudge_->session.get();
304 session.reset(new SyncSession(s->context(), s->delegate(), s->source(), 377 job.session.reset(new SyncSession(s->context(), s->delegate(),
305 s->routing_info(), s->workers())); 378 s->source(), s->routing_info(), s->workers()));
306 pending_nudge_.reset(); 379 pending_nudge_.reset();
307 } 380 }
308 } 381 }
309 ScheduleSyncSessionJob(delay, NUDGE, session.release(), nudge_location); 382
383 // TODO(lipalani) - pass the job itself to ScheduleSyncSessionJob.
tim (not reviewing) 2011/04/07 06:13:07 if we do that then there is no point to having Sch
lipalani1 2011/04/07 18:35:45 Same as above. On 2011/04/07 06:13:07, timsteele w
384 ScheduleSyncSessionJob(delay, NUDGE, job.session.release(), nudge_location);
310 } 385 }
311 386
312 // Helper to extract the routing info and workers corresponding to types in 387 // Helper to extract the routing info and workers corresponding to types in
313 // |types| from |registrar|. 388 // |types| from |registrar|.
314 void GetModelSafeParamsForTypes(const ModelTypeBitSet& types, 389 void GetModelSafeParamsForTypes(const ModelTypeBitSet& types,
315 ModelSafeWorkerRegistrar* registrar, ModelSafeRoutingInfo* routes, 390 ModelSafeWorkerRegistrar* registrar, ModelSafeRoutingInfo* routes,
316 std::vector<ModelSafeWorker*>* workers) { 391 std::vector<ModelSafeWorker*>* workers) {
317 ModelSafeRoutingInfo r_tmp; 392 ModelSafeRoutingInfo r_tmp;
318 std::vector<ModelSafeWorker*> w_tmp; 393 std::vector<ModelSafeWorker*> w_tmp;
319 registrar->GetModelSafeRoutingInfo(&r_tmp); 394 registrar->GetModelSafeRoutingInfo(&r_tmp);
(...skipping 27 matching lines...) Expand all
347 NOTREACHED(); 422 NOTREACHED();
348 return; 423 return;
349 } 424 }
350 425
351 ModelSafeRoutingInfo routes; 426 ModelSafeRoutingInfo routes;
352 std::vector<ModelSafeWorker*> workers; 427 std::vector<ModelSafeWorker*> workers;
353 GetModelSafeParamsForTypes(types, session_context_->registrar(), 428 GetModelSafeParamsForTypes(types, session_context_->registrar(),
354 &routes, &workers); 429 &routes, &workers);
355 430
356 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( 431 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
357 this, &SyncerThread::ScheduleConfigImpl, routes, workers)); 432 this, &SyncerThread::ScheduleConfigImpl, routes, workers,
433 GetUpdatesCallerInfo::FIRST_UPDATE));
358 } 434 }
359 435
360 void SyncerThread::ScheduleConfigImpl(const ModelSafeRoutingInfo& routing_info, 436 void SyncerThread::ScheduleConfigImpl(const ModelSafeRoutingInfo& routing_info,
361 const std::vector<ModelSafeWorker*>& workers) { 437 const std::vector<ModelSafeWorker*>& workers,
438 const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source) {
362 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 439 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
363 440
364 // TODO(tim): config-specific GetUpdatesCallerInfo value? 441 // TODO(tim): config-specific GetUpdatesCallerInfo value?
365 SyncSession* session = new SyncSession(session_context_.get(), this, 442 SyncSession* session = new SyncSession(session_context_.get(), this,
366 SyncSourceInfo(GetUpdatesCallerInfo::FIRST_UPDATE, 443 SyncSourceInfo(source,
367 syncable::ModelTypePayloadMapFromRoutingInfo( 444 syncable::ModelTypePayloadMapFromRoutingInfo(
368 routing_info, std::string())), 445 routing_info, std::string())),
369 routing_info, workers); 446 routing_info, workers);
370 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), CONFIGURATION, session, 447 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), CONFIGURATION, session,
371 FROM_HERE); 448 FROM_HERE);
372 } 449 }
373 450
374 void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay, 451 void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay,
375 SyncSessionJobPurpose purpose, sessions::SyncSession* session, 452 SyncSessionJobPurpose purpose, sessions::SyncSession* session,
376 const tracked_objects::Location& nudge_location) { 453 const tracked_objects::Location& nudge_location) {
377 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 454 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
378 455
379 SyncSessionJob job = {purpose, TimeTicks::Now() + delay, 456 SyncSessionJob job = {purpose, TimeTicks::Now() + delay,
380 make_linked_ptr(session), nudge_location}; 457 make_linked_ptr(session), nudge_location};
381 if (purpose == NUDGE) { 458 if (purpose == NUDGE) {
(...skipping 20 matching lines...) Expand all
402 case POLL: 479 case POLL:
403 *start = SYNCER_BEGIN; 480 *start = SYNCER_BEGIN;
404 return; 481 return;
405 default: 482 default:
406 NOTREACHED(); 483 NOTREACHED();
407 } 484 }
408 } 485 }
409 486
410 void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) { 487 void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) {
411 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 488 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
412 if (!ShouldRunJob(job.purpose, job.scheduled_start)) { 489 if (!ProcessJob(job)) {
413 LOG(WARNING) << "Dropping nudge at DoSyncSessionJob, source = " 490 LOG(WARNING) << "Dropping nudge at DoSyncSessionJob, source = "
414 << job.session->source().updates_source; 491 << job.session->source().updates_source;
415 return; 492 return;
416 } 493 }
417 494
418 if (job.purpose == NUDGE) { 495 if (job.purpose == NUDGE) {
419 DCHECK(pending_nudge_.get()); 496 DCHECK(pending_nudge_.get());
420 if (pending_nudge_->session != job.session) 497 if (pending_nudge_->session != job.session)
421 return; // Another nudge must have been scheduled in in the meantime. 498 return; // Another nudge must have been scheduled in in the meantime.
422 pending_nudge_.reset(); 499 pending_nudge_.reset();
500 saved_nudge_ = false;
423 } 501 }
424 502
425 SyncerStep begin(SYNCER_BEGIN); 503 SyncerStep begin(SYNCER_BEGIN);
426 SyncerStep end(SYNCER_END); 504 SyncerStep end(SYNCER_END);
427 SetSyncerStepsForPurpose(job.purpose, &begin, &end); 505 SetSyncerStepsForPurpose(job.purpose, &begin, &end);
428 506
429 bool has_more_to_sync = true; 507 bool has_more_to_sync = true;
430 while (ShouldRunJob(job.purpose, job.scheduled_start) && has_more_to_sync) { 508 while (ProcessJob(job) && has_more_to_sync) {
431 VLOG(1) << "SyncerThread: Calling SyncShare."; 509 VLOG(1) << "SyncerThread: Calling SyncShare.";
432 // Synchronously perform the sync session from this thread. 510 // Synchronously perform the sync session from this thread.
433 syncer_->SyncShare(job.session.get(), begin, end); 511 syncer_->SyncShare(job.session.get(), begin, end);
434 has_more_to_sync = job.session->HasMoreToSync(); 512 has_more_to_sync = job.session->HasMoreToSync();
435 if (has_more_to_sync) 513 if (has_more_to_sync)
436 job.session->ResetTransientState(); 514 job.session->ResetTransientState();
437 } 515 }
438 VLOG(1) << "SyncerThread: Done SyncShare looping."; 516 VLOG(1) << "SyncerThread: Done SyncShare looping.";
439 FinishSyncSessionJob(job); 517 FinishSyncSessionJob(job);
440 } 518 }
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after
512 HandleConsecutiveContinuationError(old_job); 590 HandleConsecutiveContinuationError(old_job);
513 } else if (IsBackingOff()) { 591 } else if (IsBackingOff()) {
514 // We weren't continuing but we're in backoff; must have been a nudge. 592 // We weren't continuing but we're in backoff; must have been a nudge.
515 DCHECK_EQ(NUDGE, old_job.purpose); 593 DCHECK_EQ(NUDGE, old_job.purpose);
516 DCHECK(!wait_interval_->had_nudge); 594 DCHECK(!wait_interval_->had_nudge);
517 wait_interval_->had_nudge = true; 595 wait_interval_->had_nudge = true;
518 wait_interval_->timer.Reset(); 596 wait_interval_->timer.Reset();
519 } else { 597 } else {
520 // We weren't continuing and we aren't in backoff. Schedule a normal 598 // We weren't continuing and we aren't in backoff. Schedule a normal
521 // continuation. 599 // continuation.
522 ScheduleNudgeImpl(TimeDelta::FromSeconds(0), NUDGE_SOURCE_CONTINUATION, 600 if (old_job.purpose == NUDGE) {
523 old_job.session->source().types, FROM_HERE); 601 ScheduleNudgeImpl(TimeDelta::FromSeconds(0), NUDGE_SOURCE_CONTINUATION,
602 old_job.session->source().types, FROM_HERE);
603 } else if (old_job.purpose == CONFIGURATION) {
604 ScheduleConfigImpl(old_job.session->routing_info(),
605 old_job.session->workers(),
606 GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION));
607 } // Drop the rest.
524 } 608 }
525 } 609 }
526 610
527 void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) { 611 void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) {
528 DCHECK(thread_.IsRunning()); 612 DCHECK(thread_.IsRunning());
529 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 613 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
530 614
531 TimeDelta poll = (!session_context_->notifications_enabled()) ? 615 TimeDelta poll = (!session_context_->notifications_enabled()) ?
532 syncer_short_poll_interval_seconds_ : 616 syncer_short_poll_interval_seconds_ :
533 syncer_long_poll_interval_seconds_; 617 syncer_long_poll_interval_seconds_;
(...skipping 15 matching lines...) Expand all
549 const SyncSessionJob& old_job) { 633 const SyncSessionJob& old_job) {
550 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 634 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
551 DCHECK(!IsBackingOff() || !wait_interval_->timer.IsRunning()); 635 DCHECK(!IsBackingOff() || !wait_interval_->timer.IsRunning());
552 SyncSession* old = old_job.session.get(); 636 SyncSession* old = old_job.session.get();
553 SyncSession* s(new SyncSession(session_context_.get(), this, 637 SyncSession* s(new SyncSession(session_context_.get(), this,
554 old->source(), old->routing_info(), old->workers())); 638 old->source(), old->routing_info(), old->workers()));
555 TimeDelta length = delay_provider_->GetDelay( 639 TimeDelta length = delay_provider_->GetDelay(
556 IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1)); 640 IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1));
557 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, 641 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF,
558 length)); 642 length));
559 SyncSessionJob job = {NUDGE, TimeTicks::Now() + length, 643 SyncSessionJob job = {old_job.purpose, TimeTicks::Now() + length,
560 make_linked_ptr(s), FROM_HERE}; 644 make_linked_ptr(s), FROM_HERE};
561 pending_nudge_.reset(new SyncSessionJob(job)); 645 wait_interval_->pending_job.reset(new SyncSessionJob(job));
562 wait_interval_->timer.Start(length, this, &SyncerThread::DoCanaryJob); 646 wait_interval_->timer.Start(length, this, &SyncerThread::DoCanaryJob);
563 } 647 }
564 648
565 // static 649 // static
566 TimeDelta SyncerThread::GetRecommendedDelay(const TimeDelta& last_delay) { 650 TimeDelta SyncerThread::GetRecommendedDelay(const TimeDelta& last_delay) {
567 if (last_delay.InSeconds() >= kMaxBackoffSeconds) 651 if (last_delay.InSeconds() >= kMaxBackoffSeconds)
568 return TimeDelta::FromSeconds(kMaxBackoffSeconds); 652 return TimeDelta::FromSeconds(kMaxBackoffSeconds);
569 653
570 // This calculates approx. base_delay_seconds * 2 +/- base_delay_seconds / 2 654 // This calculates approx. base_delay_seconds * 2 +/- base_delay_seconds / 2
571 int64 backoff_s = 655 int64 backoff_s =
(...skipping 14 matching lines...) Expand all
586 return TimeDelta::FromSeconds(backoff_s); 670 return TimeDelta::FromSeconds(backoff_s);
587 } 671 }
588 672
589 void SyncerThread::Stop() { 673 void SyncerThread::Stop() {
590 syncer_->RequestEarlyExit(); // Safe to call from any thread. 674 syncer_->RequestEarlyExit(); // Safe to call from any thread.
591 session_context_->connection_manager()->RemoveListener(this); 675 session_context_->connection_manager()->RemoveListener(this);
592 thread_.Stop(); 676 thread_.Stop();
593 } 677 }
594 678
595 void SyncerThread::DoCanaryJob() { 679 void SyncerThread::DoCanaryJob() {
596 DCHECK(pending_nudge_.get());
597 wait_interval_->had_nudge = false; 680 wait_interval_->had_nudge = false;
598 SyncSessionJob copy = *pending_nudge_; 681 SyncSessionJob copy = *(wait_interval_->pending_job);
682 if (copy.purpose == CONFIGURATION) {
683 DCHECK(mode_ == CONFIGURATION_MODE);
684 } else {
685 pending_nudge_.reset(new SyncSessionJob(copy));
686 }
599 DoSyncSessionJob(copy); 687 DoSyncSessionJob(copy);
600 } 688 }
601 689
602 void SyncerThread::PollTimerCallback() { 690 void SyncerThread::PollTimerCallback() {
603 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 691 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
604 ModelSafeRoutingInfo r; 692 ModelSafeRoutingInfo r;
605 std::vector<ModelSafeWorker*> w; 693 std::vector<ModelSafeWorker*> w;
606 session_context_->registrar()->GetModelSafeRoutingInfo(&r); 694 session_context_->registrar()->GetModelSafeRoutingInfo(&r);
607 session_context_->registrar()->GetWorkers(&w); 695 session_context_->registrar()->GetWorkers(&w);
608 ModelTypePayloadMap types_with_payloads = 696 ModelTypePayloadMap types_with_payloads =
609 syncable::ModelTypePayloadMapFromRoutingInfo(r, std::string()); 697 syncable::ModelTypePayloadMapFromRoutingInfo(r, std::string());
610 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads); 698 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads);
611 SyncSession* s = new SyncSession(session_context_.get(), this, info, r, w); 699 SyncSession* s = new SyncSession(session_context_.get(), this, info, r, w);
612 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), POLL, s, FROM_HERE); 700 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), POLL, s, FROM_HERE);
613 } 701 }
614 702
615 void SyncerThread::Unthrottle() { 703 void SyncerThread::Unthrottle() {
616 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); 704 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
705 DoCanaryJob();
617 wait_interval_.reset(); 706 wait_interval_.reset();
618 } 707 }
619 708
620 void SyncerThread::Notify(SyncEngineEvent::EventCause cause) { 709 void SyncerThread::Notify(SyncEngineEvent::EventCause cause) {
621 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 710 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
622 session_context_->NotifyListeners(SyncEngineEvent(cause)); 711 session_context_->NotifyListeners(SyncEngineEvent(cause));
623 } 712 }
624 713
625 bool SyncerThread::IsBackingOff() const { 714 bool SyncerThread::IsBackingOff() const {
626 return wait_interval_.get() && wait_interval_->mode == 715 return wait_interval_.get() && wait_interval_->mode ==
(...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after
662 &SyncerThread::CheckServerConnectionManagerStatus, 751 &SyncerThread::CheckServerConnectionManagerStatus,
663 event.connection_code)); 752 event.connection_code));
664 } 753 }
665 754
666 void SyncerThread::set_notifications_enabled(bool notifications_enabled) { 755 void SyncerThread::set_notifications_enabled(bool notifications_enabled) {
667 session_context_->set_notifications_enabled(notifications_enabled); 756 session_context_->set_notifications_enabled(notifications_enabled);
668 } 757 }
669 758
670 } // s3 759 } // s3
671 } // browser_sync 760 } // browser_sync
OLDNEW
« no previous file with comments | « chrome/browser/sync/engine/syncer_thread2.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698