OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2010 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "chrome/browser/sync/engine/syncer_thread2.h" | |
6 | |
7 #include "base/rand_util.h" | |
8 #include "chrome/browser/sync/engine/syncer.h" | |
9 | |
10 using base::TimeDelta; | |
11 using base::TimeTicks; | |
12 | |
13 namespace browser_sync { | |
14 | |
15 using sessions::SyncSession; | |
16 using sessions::SyncSessionSnapshot; | |
17 using sessions::SyncSourceInfo; | |
18 using syncable::ModelTypeBitSet; | |
19 using sync_pb::GetUpdatesCallerInfo; | |
20 | |
21 namespace s3 { | |
22 | |
23 SyncerThread::SyncerThread(sessions::SyncSessionContext* context, | |
24 Syncer* syncer) | |
25 : thread_("SyncEngine_SyncerThread"), | |
26 syncer_short_poll_interval_seconds_( | |
27 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), | |
28 syncer_long_poll_interval_seconds_( | |
29 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), | |
30 server_connection_ok_(false), | |
31 syncer_(syncer), | |
32 session_context_(context) { | |
33 } | |
34 | |
35 SyncerThread::~SyncerThread() { | |
36 DCHECK(!thread_.IsRunning()); | |
37 } | |
38 | |
39 void SyncerThread::Start(Mode mode) { | |
40 if (!thread_.IsRunning() && !thread_.Start()) { | |
41 NOTREACHED() << "Unable to start SyncerThread."; | |
42 return; | |
43 } | |
44 | |
45 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( | |
46 this, &SyncerThread::StartImpl, mode)); | |
47 } | |
48 | |
49 void SyncerThread::StartImpl(Mode mode) { | |
50 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
51 DCHECK(!session_context_->account_name().empty()); | |
52 DCHECK(syncer_.get()); | |
53 mode_ = mode; | |
54 AdjustPolling(NULL); // Will kick start poll timer if needed. | |
55 } | |
56 | |
57 bool SyncerThread::ShouldRunJob(SyncSessionJob::Purpose purpose, | |
58 const TimeTicks& scheduled_start) { | |
59 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
60 | |
61 // Check wait interval. | |
62 if (wait_interval_.get()) { | |
63 if (wait_interval_->mode == WaitInterval::THROTTLED) { | |
64 return false; | |
65 } else { | |
66 DCHECK_EQ(wait_interval_->mode, | |
67 WaitInterval::EXPONENTIAL_BACKOFF); | |
68 switch (purpose) { | |
69 case SyncSessionJob::NUDGE: | |
70 if (!wait_interval_->had_nudge) | |
Nicolas Zea
2010/12/23 21:49:20
Why don't we back off if we haven't had a nudge?
tim (not reviewing)
2010/12/24 04:23:47
the code allows one nudge per backoff interval (ak
| |
71 break; | |
72 default: | |
73 DCHECK(purpose == SyncSessionJob::POLL || | |
74 purpose == SyncSessionJob::NUDGE); | |
75 return false; | |
76 } | |
77 } | |
78 } | |
79 | |
80 // Mode / purpose contract. | |
Nicolas Zea
2010/12/23 21:49:20
An explanation of when the contract might be viola
tim (not reviewing)
2010/12/24 04:23:47
Ok. I'll adapt the 'Mode' comment in the header a
| |
81 switch (mode_) { | |
82 case CONFIGURATION_MODE: | |
83 if (purpose != SyncSessionJob::CONFIGURATION) | |
84 return false; | |
85 break; | |
86 case NORMAL_MODE: | |
87 if (purpose != SyncSessionJob::POLL && purpose != SyncSessionJob::NUDGE) | |
88 return false; | |
89 break; | |
90 default: | |
91 NOTREACHED() << "Unknown SyncerThread Mode: " << mode_; | |
92 return false; | |
93 } | |
94 | |
95 // Freshness condition. | |
96 if (purpose == SyncSessionJob::NUDGE && | |
97 (scheduled_start < last_sync_session_end_time_)) { | |
98 return false; | |
99 } | |
100 | |
101 return server_connection_ok_; | |
102 } | |
103 | |
104 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( | |
105 NudgeSource source) { | |
106 switch (source) { | |
107 case NUDGE_SOURCE_NOTIFICATION: | |
108 return GetUpdatesCallerInfo::NOTIFICATION; | |
109 case NUDGE_SOURCE_LOCAL: | |
110 return GetUpdatesCallerInfo::LOCAL; | |
111 case NUDGE_SOURCE_CONTINUATION: | |
112 return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; | |
113 case NUDGE_SOURCE_CLEAR_PRIVATE_DATA: | |
114 return GetUpdatesCallerInfo::CLEAR_PRIVATE_DATA; | |
115 case NUDGE_SOURCE_UNKNOWN: | |
116 default: | |
117 return GetUpdatesCallerInfo::UNKNOWN; | |
118 } | |
119 } | |
120 | |
121 void SyncerThread::ScheduleNudge(const TimeDelta& delay, | |
122 NudgeSource source, const ModelTypeBitSet& types) { | |
123 if (!thread_.IsRunning()) { | |
124 NOTREACHED(); | |
125 return; | |
126 } | |
127 | |
128 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( | |
129 this, &SyncerThread::ScheduleNudgeImpl, delay, source, types)); | |
130 } | |
131 | |
132 // Functor for std::find_if to search by ModelSafeGroup. | |
133 struct WorkerGroupIs { | |
134 explicit WorkerGroupIs(ModelSafeGroup group) : group(group) {} | |
135 bool operator()(ModelSafeWorker* w) { | |
136 return group == w->GetModelSafeGroup(); | |
137 } | |
138 ModelSafeGroup group; | |
139 }; | |
140 | |
141 void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay, | |
142 NudgeSource source, const ModelTypeBitSet& model_types) { | |
143 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
144 TimeTicks rough_start = TimeTicks::Now() + delay; | |
145 if (!ShouldRunJob(SyncSessionJob::NUDGE, rough_start)) | |
146 return; | |
147 | |
148 // Note we currently nudge for all types regardless of the ones incurring | |
149 // the nudge. Doing different would throw off some syncer commands like | |
150 // CleanupDisabledTypes. We may want to change this in the future. | |
151 ModelSafeRoutingInfo routes; | |
152 std::vector<ModelSafeWorker*> workers; | |
153 session_context_->registrar()->GetModelSafeRoutingInfo(&routes); | |
154 session_context_->registrar()->GetWorkers(&workers); | |
155 SyncSourceInfo info(GetUpdatesFromNudgeSource(source), model_types); | |
156 | |
157 scoped_refptr<SyncSession> session = new SyncSession( | |
158 session_context_.get(), this, info, routes, workers); | |
159 | |
160 if (pending_nudge_.get()) { | |
161 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) | |
162 return; | |
163 | |
164 pending_nudge_->session->Coalesce(session); | |
165 if (!IsBackingOff()) | |
166 return; | |
167 } | |
168 ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, session.release()); | |
169 } | |
170 | |
171 // Helper to extract the routing info and workers corresponding to types in | |
172 // |types| from |registrar|. | |
173 void GetModelSafeParamsForTypes(const ModelTypeBitSet& types, | |
174 ModelSafeWorkerRegistrar* registrar, ModelSafeRoutingInfo* routes, | |
175 std::vector<ModelSafeWorker*>* workers) { | |
176 ModelSafeRoutingInfo r_tmp; | |
177 std::vector<ModelSafeWorker*> w_tmp; | |
178 registrar->GetModelSafeRoutingInfo(&r_tmp); | |
179 registrar->GetWorkers(&w_tmp); | |
180 | |
181 typedef std::vector<ModelSafeWorker*>::const_iterator iter; | |
182 for (size_t i = syncable::FIRST_REAL_MODEL_TYPE; i < types.size(); ++i) { | |
183 if (!types.test(i)) | |
184 continue; | |
185 syncable::ModelType t = syncable::ModelTypeFromInt(i); | |
186 DCHECK_EQ(1U, r_tmp.count(t)); | |
187 (*routes)[t] = r_tmp[t]; | |
188 iter it = std::find_if(w_tmp.begin(), w_tmp.end(), WorkerGroupIs(r_tmp[t])); | |
189 DCHECK(w_tmp.end() != it); | |
190 workers->push_back(*it); | |
191 } | |
192 | |
193 iter it = std::find_if(w_tmp.begin(), w_tmp.end(), | |
194 WorkerGroupIs(GROUP_PASSIVE)); | |
195 DCHECK(w_tmp.end() != it); | |
196 workers->push_back(*it); | |
197 } | |
198 | |
199 void SyncerThread::ScheduleConfig(const TimeDelta& delay, | |
200 const ModelTypeBitSet& types) { | |
201 if (!thread_.IsRunning()) { | |
202 NOTREACHED(); | |
203 return; | |
204 } | |
205 | |
206 ModelSafeRoutingInfo routes; | |
207 std::vector<ModelSafeWorker*> workers; | |
208 GetModelSafeParamsForTypes(types, session_context_->registrar(), | |
209 &routes, &workers); | |
210 | |
211 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( | |
212 this, &SyncerThread::ScheduleConfigImpl, delay, routes, workers)); | |
213 } | |
214 | |
215 void SyncerThread::ScheduleConfigImpl(const TimeDelta& delay, | |
216 const ModelSafeRoutingInfo& routing_info, | |
217 const std::vector<ModelSafeWorker*>& workers) { | |
218 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
219 NOTIMPLEMENTED() << "TODO(tim)"; | |
220 } | |
221 | |
222 void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay, | |
223 SyncSessionJob::Purpose purpose, sessions::SyncSession* session) { | |
224 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
225 SyncSessionJob job = {purpose, TimeTicks::Now() + delay, session}; | |
226 if (purpose == SyncSessionJob::NUDGE) { | |
227 DCHECK(!pending_nudge_.get()); | |
228 pending_nudge_.reset(new SyncSessionJob(job)); | |
229 } | |
230 MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(this, | |
231 &SyncerThread::DoSyncSessionJob, job), delay.InMilliseconds()); | |
232 } | |
233 | |
234 void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) { | |
235 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
236 | |
237 if (job.purpose == SyncSessionJob::NUDGE) { | |
238 DCHECK(pending_nudge_.get()); | |
239 if (pending_nudge_->session != job.session) | |
240 return; // Another nudge must have been scheduled in in the meantime. | |
241 pending_nudge_.reset(); | |
242 } else if (job.purpose == SyncSessionJob::CONFIGURATION) { | |
243 NOTIMPLEMENTED() << "TODO(tim): SyncShare [DOWNLOAD_UPDATES,APPLY_UPDATES]"; | |
244 } | |
245 | |
246 bool has_more_to_sync = true; | |
247 while (ShouldRunJob(job.purpose, job.scheduled_start) && has_more_to_sync) { | |
248 VLOG(1) << "SyncerThread: Calling SyncShare."; | |
249 // Synchronously perform the sync session from this thread. | |
250 syncer_->SyncShare(job.session); | |
251 has_more_to_sync = job.session->HasMoreToSync(); | |
252 if (has_more_to_sync) | |
253 job.session->ResetTransientState(); | |
254 } | |
255 VLOG(1) << "SyncerThread: Done SyncShare looping."; | |
256 FinishSyncSessionJob(job); | |
257 } | |
258 | |
259 void SyncerThread::FinishSyncSessionJob(const SyncSessionJob& job) { | |
260 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
261 // Update timing information for how often datatypes are triggering nudges. | |
262 base::TimeTicks now = TimeTicks::Now(); | |
263 for (size_t i = syncable::FIRST_REAL_MODEL_TYPE; | |
264 i < job.session->source().second.size() && | |
265 !last_sync_session_end_time_.is_null(); | |
266 ++i) { | |
267 if (job.session->source().second[i]) { | |
268 syncable::PostTimeToTypeHistogram(syncable::ModelTypeFromInt(i), | |
269 now - last_sync_session_end_time_); | |
270 } | |
271 } | |
272 last_sync_session_end_time_ = now; | |
273 if (IsSyncingCurrentlySilenced()) | |
274 return; // Nothing to do. | |
275 | |
276 VLOG(1) << "Updating the next polling time after SyncMain"; | |
277 ScheduleNextSync(job); | |
278 } | |
279 | |
280 void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) { | |
281 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
282 const bool work_to_do = | |
283 old_job.session->status_controller()->num_server_changes_remaining() > | |
284 old_job.session->status_controller()->ComputeMaxLocalTimestamp() || | |
285 old_job.session->status_controller()->unsynced_handles().size() > 0; | |
286 VLOG(1) << "syncer has work to do: " << work_to_do; | |
287 | |
288 AdjustPolling(&old_job); | |
289 | |
290 // TODO(tim): Old impl had special code if notifications disabled. Needed? | |
291 if (!work_to_do) | |
292 return; | |
293 | |
294 if (old_job.session->source().first == | |
295 GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION) { | |
296 // We don't seem to have made forward progress. Start or extend backoff. | |
297 HandleConsecutiveContinuationError(old_job); | |
298 } else if (IsBackingOff()) { | |
299 // We weren't continuing but we're in backoff; must have been a nudge. | |
300 DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose); | |
301 DCHECK(!wait_interval_->had_nudge); | |
302 wait_interval_->had_nudge = true; | |
303 wait_interval_->timer.Reset(); | |
304 } else { | |
305 // We weren't continuing and we aren't in backoff. Schedule a normal | |
306 // continuation. | |
307 ScheduleNudgeImpl(GetRecommendedDelay(TimeDelta::FromSeconds(0)), | |
308 NUDGE_SOURCE_CONTINUATION, | |
309 old_job.session->source().second); | |
310 } | |
311 } | |
312 | |
313 void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) { | |
314 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
315 TimeDelta poll = (!session_context_->notifications_enabled()) ? | |
316 syncer_short_poll_interval_seconds_ : | |
317 syncer_long_poll_interval_seconds_; | |
318 bool rate_changed = !poll_timer_.IsRunning() || | |
319 poll != poll_timer_.GetCurrentDelay(); | |
320 | |
321 if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed) | |
322 poll_timer_.Reset(); | |
323 | |
324 if (!rate_changed) | |
325 return; | |
326 | |
327 // Adjust poll rate. | |
328 poll_timer_.Stop(); | |
329 poll_timer_.Start(poll, this, &SyncerThread::PollTimerCallback); | |
330 } | |
331 | |
332 void SyncerThread::HandleConsecutiveContinuationError( | |
333 const SyncSessionJob& old_job) { | |
334 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
335 DCHECK(!IsBackingOff() || !wait_interval_->timer.IsRunning()); | |
336 SyncSession* old = old_job.session; | |
337 SyncSession* s(new SyncSession(session_context_.get(), this, | |
338 old->source(), old->routing_info(), old->workers())); | |
339 TimeDelta length = GetRecommendedDelay( | |
340 IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1)); | |
341 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, | |
342 length)); | |
343 SyncSessionJob job = {SyncSessionJob::NUDGE, TimeTicks::Now() + length, s}; | |
344 pending_nudge_.reset(new SyncSessionJob(job)); | |
345 wait_interval_->timer.Start(length, this, &SyncerThread::DoCanaryJob); | |
346 } | |
347 | |
348 // static | |
349 TimeDelta SyncerThread::GetRecommendedDelay(const TimeDelta& last_delay) { | |
350 if (last_delay.InSeconds() >= kMaxBackoffSeconds) | |
351 return TimeDelta::FromSeconds(kMaxBackoffSeconds); | |
352 | |
353 // This calculates approx. base_delay_seconds * 2 +/- base_delay_seconds / 2 | |
354 int64 backoff_s = | |
355 std::max(1LL, last_delay.InSeconds() * kBackoffRandomizationFactor); | |
356 | |
357 // Flip a coin to randomize backoff interval by +/- 50%. | |
358 int rand_sign = base::RandInt(0, 1) * 2 - 1; | |
359 | |
360 // Truncation is adequate for rounding here. | |
361 backoff_s = backoff_s + | |
362 (rand_sign * (last_delay.InSeconds() / kBackoffRandomizationFactor)); | |
363 | |
364 // Cap the backoff interval. | |
365 backoff_s = std::max(1LL, std::min(backoff_s, kMaxBackoffSeconds)); | |
366 | |
367 return TimeDelta::FromSeconds(backoff_s); | |
368 } | |
369 | |
370 void SyncerThread::Stop() { | |
371 syncer_->RequestEarlyExit(); // Safe to call from any thread. | |
372 thread_.Stop(); | |
373 Notify(SyncEngineEvent::SYNCER_THREAD_EXITING); | |
374 } | |
375 | |
376 void SyncerThread::DoCanaryJob() { | |
377 DCHECK(pending_nudge_.get()); | |
378 wait_interval_->had_nudge = false; | |
379 DoSyncSessionJob(*pending_nudge_); | |
380 } | |
381 | |
382 void SyncerThread::PollTimerCallback() { | |
383 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
384 ModelSafeRoutingInfo r; | |
385 std::vector<ModelSafeWorker*> w; | |
386 session_context_->registrar()->GetModelSafeRoutingInfo(&r); | |
387 session_context_->registrar()->GetWorkers(&w); | |
388 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, ModelTypeBitSet()); | |
389 SyncSession* s = new SyncSession(session_context_.get(), this, info, r, w); | |
390 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s); | |
391 } | |
392 | |
393 void SyncerThread::Unthrottle() { | |
394 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); | |
395 wait_interval_.reset(); | |
396 } | |
397 | |
398 void SyncerThread::Notify(SyncEngineEvent::EventCause cause) { | |
399 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
400 session_context_->NotifyListeners(SyncEngineEvent(cause)); | |
401 } | |
402 | |
403 bool SyncerThread::IsBackingOff() const { | |
404 return wait_interval_.get() && wait_interval_->mode == | |
405 WaitInterval::EXPONENTIAL_BACKOFF; | |
406 } | |
407 | |
408 void SyncerThread::OnSilencedUntil(const base::TimeTicks& silenced_until) { | |
409 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, | |
410 silenced_until - TimeTicks::Now())); | |
411 wait_interval_->timer.Start(wait_interval_->length, this, | |
412 &SyncerThread::Unthrottle); | |
413 } | |
414 | |
415 bool SyncerThread::IsSyncingCurrentlySilenced() { | |
416 return wait_interval_.get() && wait_interval_->mode == | |
417 WaitInterval::THROTTLED; | |
418 } | |
419 | |
420 void SyncerThread::OnReceivedShortPollIntervalUpdate( | |
421 const base::TimeDelta& new_interval) { | |
422 syncer_short_poll_interval_seconds_ = new_interval; | |
423 } | |
424 | |
425 void SyncerThread::OnReceivedLongPollIntervalUpdate( | |
426 const base::TimeDelta& new_interval) { | |
427 syncer_long_poll_interval_seconds_ = new_interval; | |
428 } | |
429 | |
430 void SyncerThread::OnShouldStopSyncingPermanently() { | |
431 syncer_->RequestEarlyExit(); // Thread-safe. | |
432 Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); | |
433 } | |
434 | |
435 void SyncerThread::OnServerConnectionEvent( | |
436 const ServerConnectionEvent& event) { | |
437 NOTIMPLEMENTED(); | |
438 } | |
439 | |
440 } // s3 | |
441 } // browser_sync | |
OLD | NEW |