OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "chrome/browser/sync/engine/syncer_thread2.h" | |
6 | |
7 #include "base/rand_util.h" | |
8 #include "chrome/browser/sync/engine/syncer.h" | |
9 | |
10 using base::TimeDelta; | |
11 using base::TimeTicks; | |
12 | |
13 namespace browser_sync { | |
14 | |
15 using sessions::SyncSession; | |
16 using sessions::SyncSessionSnapshot; | |
17 using sessions::SyncSourceInfo; | |
18 using syncable::ModelTypeBitSet; | |
19 using sync_pb::GetUpdatesCallerInfo; | |
20 | |
21 namespace s3 { | |
22 | |
23 SyncerThread::SyncerThread(sessions::SyncSessionContext* context, | |
24 Syncer* syncer) | |
25 : thread_("SyncEngine_SyncerThread"), | |
26 syncer_short_poll_interval_seconds_( | |
27 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), | |
28 syncer_long_poll_interval_seconds_( | |
29 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), | |
30 server_connection_ok_(false), | |
31 syncer_(syncer), | |
32 session_context_(context) { | |
33 } | |
34 | |
35 SyncerThread::~SyncerThread() { | |
36 DCHECK(!thread_.IsRunning()); | |
37 } | |
38 | |
39 void SyncerThread::Start(Mode mode) { | |
40 if (!thread_.IsRunning() && !thread_.Start()) { | |
41 NOTREACHED() << "Unable to start SyncerThread."; | |
42 return; | |
43 } | |
44 | |
45 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( | |
46 this, &SyncerThread::StartImpl, mode)); | |
47 } | |
48 | |
49 void SyncerThread::StartImpl(Mode mode) { | |
50 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
51 DCHECK(!session_context_->account_name().empty()); | |
52 DCHECK(syncer_.get()); | |
53 mode_ = mode; | |
54 AdjustPolling(NULL); // Will kick start poll timer if needed. | |
55 } | |
56 | |
57 bool SyncerThread::ShouldRunJob(SyncSessionJob::Purpose purpose, | |
58 const TimeTicks& scheduled_start) { | |
59 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
60 | |
61 // Check wait interval. | |
62 if (wait_interval_.get()) { | |
63 if (wait_interval_->mode == WaitInterval::THROTTLED) { | |
64 return false; | |
65 } else { | |
akalin
2011/01/20 00:47:57
no need for else since the if returns.
tim (not reviewing)
2011/01/25 03:23:05
Done.
| |
66 DCHECK_EQ(wait_interval_->mode, | |
67 WaitInterval::EXPONENTIAL_BACKOFF); | |
68 switch (purpose) { | |
akalin
2011/01/20 00:47:57
isn't this clearer as:
DCHECK(purpose == POLL ||
tim (not reviewing)
2011/01/25 03:23:05
Done.
| |
69 case SyncSessionJob::NUDGE: | |
70 if (!wait_interval_->had_nudge) | |
71 break; | |
72 default: | |
73 DCHECK(purpose == SyncSessionJob::POLL || | |
74 purpose == SyncSessionJob::NUDGE); | |
75 return false; | |
76 } | |
77 } | |
78 } | |
79 | |
80 // Mode / purpose contract (See 'Mode' enum in header). Don't run jobs that | |
81 // were intended for a normal sync if we are in configuration mode, and vice | |
82 // versa. | |
83 switch (mode_) { | |
84 case CONFIGURATION_MODE: | |
85 if (purpose != SyncSessionJob::CONFIGURATION) | |
86 return false; | |
87 break; | |
88 case NORMAL_MODE: | |
89 if (purpose != SyncSessionJob::POLL && purpose != SyncSessionJob::NUDGE) | |
90 return false; | |
91 break; | |
92 default: | |
93 NOTREACHED() << "Unknown SyncerThread Mode: " << mode_; | |
94 return false; | |
95 } | |
96 | |
97 // Freshness condition. | |
98 if (purpose == SyncSessionJob::NUDGE && | |
99 (scheduled_start < last_sync_session_end_time_)) { | |
100 return false; | |
101 } | |
102 | |
103 return server_connection_ok_; | |
104 } | |
105 | |
106 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource( | |
akalin
2011/01/20 00:47:57
anon namespace?
tim (not reviewing)
2011/01/25 03:23:05
Done.
| |
107 NudgeSource source) { | |
108 switch (source) { | |
109 case NUDGE_SOURCE_NOTIFICATION: | |
110 return GetUpdatesCallerInfo::NOTIFICATION; | |
111 case NUDGE_SOURCE_LOCAL: | |
112 return GetUpdatesCallerInfo::LOCAL; | |
113 case NUDGE_SOURCE_CONTINUATION: | |
114 return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; | |
115 case NUDGE_SOURCE_CLEAR_PRIVATE_DATA: | |
116 return GetUpdatesCallerInfo::CLEAR_PRIVATE_DATA; | |
117 case NUDGE_SOURCE_UNKNOWN: | |
118 default: | |
119 return GetUpdatesCallerInfo::UNKNOWN; | |
120 } | |
121 } | |
122 | |
123 void SyncerThread::ScheduleNudge(const TimeDelta& delay, | |
124 NudgeSource source, const ModelTypeBitSet& types) { | |
125 if (!thread_.IsRunning()) { | |
126 NOTREACHED(); | |
127 return; | |
128 } | |
129 | |
130 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( | |
131 this, &SyncerThread::ScheduleNudgeImpl, delay, source, types)); | |
132 } | |
133 | |
134 // Functor for std::find_if to search by ModelSafeGroup. | |
akalin
2011/01/20 00:47:57
anon namespace?
tim (not reviewing)
2011/01/25 03:23:05
Done.
| |
135 struct WorkerGroupIs { | |
136 explicit WorkerGroupIs(ModelSafeGroup group) : group(group) {} | |
137 bool operator()(ModelSafeWorker* w) { | |
138 return group == w->GetModelSafeGroup(); | |
139 } | |
140 ModelSafeGroup group; | |
141 }; | |
142 | |
143 void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay, | |
144 NudgeSource source, const ModelTypeBitSet& model_types) { | |
145 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
146 TimeTicks rough_start = TimeTicks::Now() + delay; | |
147 if (!ShouldRunJob(SyncSessionJob::NUDGE, rough_start)) | |
148 return; | |
149 | |
150 // Note we currently nudge for all types regardless of the ones incurring | |
151 // the nudge. Doing different would throw off some syncer commands like | |
152 // CleanupDisabledTypes. We may want to change this in the future. | |
153 ModelSafeRoutingInfo routes; | |
154 std::vector<ModelSafeWorker*> workers; | |
155 session_context_->registrar()->GetModelSafeRoutingInfo(&routes); | |
156 session_context_->registrar()->GetWorkers(&workers); | |
157 SyncSourceInfo info(GetUpdatesFromNudgeSource(source), model_types); | |
158 | |
159 scoped_ptr<SyncSession> session(new SyncSession( | |
160 session_context_.get(), this, info, routes, workers)); | |
161 | |
162 if (pending_nudge_.get()) { | |
163 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) | |
164 return; | |
165 | |
166 pending_nudge_->session->Coalesce(session.get()); | |
167 if (!IsBackingOff()) | |
168 return; | |
169 } | |
170 ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, session.release()); | |
171 } | |
172 | |
173 // Helper to extract the routing info and workers corresponding to types in | |
174 // |types| from |registrar|. | |
175 void GetModelSafeParamsForTypes(const ModelTypeBitSet& types, | |
176 ModelSafeWorkerRegistrar* registrar, ModelSafeRoutingInfo* routes, | |
177 std::vector<ModelSafeWorker*>* workers) { | |
178 ModelSafeRoutingInfo r_tmp; | |
179 std::vector<ModelSafeWorker*> w_tmp; | |
180 registrar->GetModelSafeRoutingInfo(&r_tmp); | |
181 registrar->GetWorkers(&w_tmp); | |
182 | |
183 typedef std::vector<ModelSafeWorker*>::const_iterator iter; | |
184 for (size_t i = syncable::FIRST_REAL_MODEL_TYPE; i < types.size(); ++i) { | |
185 if (!types.test(i)) | |
186 continue; | |
187 syncable::ModelType t = syncable::ModelTypeFromInt(i); | |
188 DCHECK_EQ(1U, r_tmp.count(t)); | |
189 (*routes)[t] = r_tmp[t]; | |
190 iter it = std::find_if(w_tmp.begin(), w_tmp.end(), WorkerGroupIs(r_tmp[t])); | |
191 DCHECK(w_tmp.end() != it); | |
akalin
2011/01/20 00:47:57
maybe:
if (it != w_tmp.end() {
workers->push_ba
tim (not reviewing)
2011/01/25 03:23:05
Yeah, probably safer.
| |
192 workers->push_back(*it); | |
193 } | |
194 | |
195 iter it = std::find_if(w_tmp.begin(), w_tmp.end(), | |
196 WorkerGroupIs(GROUP_PASSIVE)); | |
197 DCHECK(w_tmp.end() != it); | |
akalin
2011/01/20 00:47:57
here, too
tim (not reviewing)
2011/01/25 03:23:05
Done.
| |
198 workers->push_back(*it); | |
199 } | |
200 | |
201 void SyncerThread::ScheduleConfig(const TimeDelta& delay, | |
202 const ModelTypeBitSet& types) { | |
203 if (!thread_.IsRunning()) { | |
204 NOTREACHED(); | |
205 return; | |
206 } | |
207 | |
208 ModelSafeRoutingInfo routes; | |
209 std::vector<ModelSafeWorker*> workers; | |
210 GetModelSafeParamsForTypes(types, session_context_->registrar(), | |
211 &routes, &workers); | |
212 | |
213 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( | |
214 this, &SyncerThread::ScheduleConfigImpl, delay, routes, workers)); | |
215 } | |
216 | |
217 void SyncerThread::ScheduleConfigImpl(const TimeDelta& delay, | |
218 const ModelSafeRoutingInfo& routing_info, | |
219 const std::vector<ModelSafeWorker*>& workers) { | |
220 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
221 NOTIMPLEMENTED() << "TODO(tim)"; | |
222 } | |
223 | |
224 void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay, | |
225 SyncSessionJob::Purpose purpose, sessions::SyncSession* session) { | |
226 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
227 SyncSessionJob job = {purpose, TimeTicks::Now() + delay, | |
228 make_linked_ptr(session)}; | |
229 if (purpose == SyncSessionJob::NUDGE) { | |
230 DCHECK(!pending_nudge_.get()); | |
231 pending_nudge_.reset(new SyncSessionJob(job)); | |
232 } | |
233 MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(this, | |
234 &SyncerThread::DoSyncSessionJob, job), delay.InMilliseconds()); | |
235 } | |
236 | |
237 void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) { | |
238 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
239 | |
240 if (job.purpose == SyncSessionJob::NUDGE) { | |
241 DCHECK(pending_nudge_.get()); | |
242 if (pending_nudge_->session != job.session) | |
243 return; // Another nudge must have been scheduled in in the meantime. | |
244 pending_nudge_.reset(); | |
245 } else if (job.purpose == SyncSessionJob::CONFIGURATION) { | |
246 NOTIMPLEMENTED() << "TODO(tim): SyncShare [DOWNLOAD_UPDATES,APPLY_UPDATES]"; | |
247 } | |
248 | |
249 bool has_more_to_sync = true; | |
250 while (ShouldRunJob(job.purpose, job.scheduled_start) && has_more_to_sync) { | |
251 VLOG(1) << "SyncerThread: Calling SyncShare."; | |
252 // Synchronously perform the sync session from this thread. | |
253 syncer_->SyncShare(job.session.get()); | |
254 has_more_to_sync = job.session->HasMoreToSync(); | |
255 if (has_more_to_sync) | |
256 job.session->ResetTransientState(); | |
257 } | |
258 VLOG(1) << "SyncerThread: Done SyncShare looping."; | |
259 FinishSyncSessionJob(job); | |
260 } | |
261 | |
262 void SyncerThread::FinishSyncSessionJob(const SyncSessionJob& job) { | |
263 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
264 // Update timing information for how often datatypes are triggering nudges. | |
265 base::TimeTicks now = TimeTicks::Now(); | |
266 for (size_t i = syncable::FIRST_REAL_MODEL_TYPE; | |
267 i < job.session->source().second.size() && | |
268 !last_sync_session_end_time_.is_null(); | |
269 ++i) { | |
270 if (job.session->source().second[i]) { | |
271 syncable::PostTimeToTypeHistogram(syncable::ModelTypeFromInt(i), | |
272 now - last_sync_session_end_time_); | |
273 } | |
274 } | |
275 last_sync_session_end_time_ = now; | |
276 if (IsSyncingCurrentlySilenced()) | |
277 return; // Nothing to do. | |
278 | |
279 VLOG(1) << "Updating the next polling time after SyncMain"; | |
280 ScheduleNextSync(job); | |
281 } | |
282 | |
283 void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) { | |
284 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
285 const bool work_to_do = | |
286 old_job.session->status_controller()->num_server_changes_remaining() > 0 | |
287 || old_job.session->status_controller()->unsynced_handles().size() > 0; | |
288 VLOG(1) << "syncer has work to do: " << work_to_do; | |
289 | |
290 AdjustPolling(&old_job); | |
291 | |
292 // TODO(tim): Old impl had special code if notifications disabled. Needed? | |
293 if (!work_to_do) | |
294 return; | |
295 | |
296 if (old_job.session->source().first == | |
297 GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION) { | |
298 // We don't seem to have made forward progress. Start or extend backoff. | |
299 HandleConsecutiveContinuationError(old_job); | |
300 } else if (IsBackingOff()) { | |
301 // We weren't continuing but we're in backoff; must have been a nudge. | |
302 DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose); | |
303 DCHECK(!wait_interval_->had_nudge); | |
304 wait_interval_->had_nudge = true; | |
305 wait_interval_->timer.Reset(); | |
306 } else { | |
307 // We weren't continuing and we aren't in backoff. Schedule a normal | |
308 // continuation. | |
309 ScheduleNudgeImpl(GetRecommendedDelay(TimeDelta::FromSeconds(0)), | |
310 NUDGE_SOURCE_CONTINUATION, | |
Nicolas Zea
2011/01/20 20:01:57
Just for my understanding here, is this called whe
tim (not reviewing)
2011/01/25 03:23:05
Typically this means some sort of error occurred b
| |
311 old_job.session->source().second); | |
312 } | |
313 } | |
314 | |
315 void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) { | |
316 DCHECK(thread_.IsRunning()); | |
317 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
318 | |
319 TimeDelta poll = (!session_context_->notifications_enabled()) ? | |
320 syncer_short_poll_interval_seconds_ : | |
321 syncer_long_poll_interval_seconds_; | |
322 bool rate_changed = !poll_timer_.IsRunning() || | |
323 poll != poll_timer_.GetCurrentDelay(); | |
324 | |
325 if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed) | |
326 poll_timer_.Reset(); | |
327 | |
328 if (!rate_changed) | |
329 return; | |
330 | |
331 // Adjust poll rate. | |
332 poll_timer_.Stop(); | |
333 poll_timer_.Start(poll, this, &SyncerThread::PollTimerCallback); | |
334 } | |
335 | |
336 void SyncerThread::HandleConsecutiveContinuationError( | |
337 const SyncSessionJob& old_job) { | |
338 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
339 DCHECK(!IsBackingOff() || !wait_interval_->timer.IsRunning()); | |
340 SyncSession* old = old_job.session.get(); | |
341 SyncSession* s(new SyncSession(session_context_.get(), this, | |
342 old->source(), old->routing_info(), old->workers())); | |
343 TimeDelta length = GetRecommendedDelay( | |
344 IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1)); | |
345 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, | |
346 length)); | |
347 SyncSessionJob job = {SyncSessionJob::NUDGE, TimeTicks::Now() + length, | |
348 make_linked_ptr(s)}; | |
349 pending_nudge_.reset(new SyncSessionJob(job)); | |
350 wait_interval_->timer.Start(length, this, &SyncerThread::DoCanaryJob); | |
351 } | |
352 | |
353 // static | |
354 TimeDelta SyncerThread::GetRecommendedDelay(const TimeDelta& last_delay) { | |
355 if (last_delay.InSeconds() >= kMaxBackoffSeconds) | |
356 return TimeDelta::FromSeconds(kMaxBackoffSeconds); | |
357 | |
358 // This calculates approx. base_delay_seconds * 2 +/- base_delay_seconds / 2 | |
359 int64 backoff_s = | |
360 std::max(1LL, last_delay.InSeconds() * kBackoffRandomizationFactor); | |
361 | |
362 // Flip a coin to randomize backoff interval by +/- 50%. | |
363 int rand_sign = base::RandInt(0, 1) * 2 - 1; | |
364 | |
365 // Truncation is adequate for rounding here. | |
366 backoff_s = backoff_s + | |
367 (rand_sign * (last_delay.InSeconds() / kBackoffRandomizationFactor)); | |
368 | |
369 // Cap the backoff interval. | |
370 backoff_s = std::max(1LL, std::min(backoff_s, kMaxBackoffSeconds)); | |
371 | |
372 return TimeDelta::FromSeconds(backoff_s); | |
373 } | |
374 | |
375 void SyncerThread::Stop() { | |
376 syncer_->RequestEarlyExit(); // Safe to call from any thread. | |
377 thread_.Stop(); | |
378 Notify(SyncEngineEvent::SYNCER_THREAD_EXITING); | |
379 } | |
380 | |
381 void SyncerThread::DoCanaryJob() { | |
382 DCHECK(pending_nudge_.get()); | |
383 wait_interval_->had_nudge = false; | |
384 DoSyncSessionJob(*pending_nudge_); | |
385 } | |
386 | |
387 void SyncerThread::PollTimerCallback() { | |
388 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
389 ModelSafeRoutingInfo r; | |
390 std::vector<ModelSafeWorker*> w; | |
391 session_context_->registrar()->GetModelSafeRoutingInfo(&r); | |
392 session_context_->registrar()->GetWorkers(&w); | |
393 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, ModelTypeBitSet()); | |
394 SyncSession* s = new SyncSession(session_context_.get(), this, info, r, w); | |
395 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s); | |
396 } | |
397 | |
398 void SyncerThread::Unthrottle() { | |
399 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); | |
400 wait_interval_.reset(); | |
401 } | |
402 | |
403 void SyncerThread::Notify(SyncEngineEvent::EventCause cause) { | |
404 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
405 session_context_->NotifyListeners(SyncEngineEvent(cause)); | |
406 } | |
407 | |
408 bool SyncerThread::IsBackingOff() const { | |
409 return wait_interval_.get() && wait_interval_->mode == | |
410 WaitInterval::EXPONENTIAL_BACKOFF; | |
411 } | |
412 | |
413 void SyncerThread::OnSilencedUntil(const base::TimeTicks& silenced_until) { | |
414 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED, | |
415 silenced_until - TimeTicks::Now())); | |
416 wait_interval_->timer.Start(wait_interval_->length, this, | |
417 &SyncerThread::Unthrottle); | |
418 } | |
419 | |
420 bool SyncerThread::IsSyncingCurrentlySilenced() { | |
akalin
2011/01/20 00:47:57
const method?
tim (not reviewing)
2011/01/25 03:23:05
Yeah, probably should be - I started to make this
| |
421 return wait_interval_.get() && wait_interval_->mode == | |
422 WaitInterval::THROTTLED; | |
423 } | |
424 | |
425 void SyncerThread::OnReceivedShortPollIntervalUpdate( | |
426 const base::TimeDelta& new_interval) { | |
427 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
428 syncer_short_poll_interval_seconds_ = new_interval; | |
429 } | |
430 | |
431 void SyncerThread::OnReceivedLongPollIntervalUpdate( | |
432 const base::TimeDelta& new_interval) { | |
433 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | |
434 syncer_long_poll_interval_seconds_ = new_interval; | |
435 } | |
436 | |
437 void SyncerThread::OnShouldStopSyncingPermanently() { | |
438 syncer_->RequestEarlyExit(); // Thread-safe. | |
439 Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); | |
440 } | |
441 | |
442 void SyncerThread::OnServerConnectionEvent( | |
443 const ServerConnectionEvent& event) { | |
444 NOTIMPLEMENTED(); | |
445 } | |
446 | |
447 void SyncerThread::set_notifications_enabled(bool notifications_enabled) { | |
448 session_context_->set_notifications_enabled(notifications_enabled); | |
449 } | |
450 | |
451 } // s3 | |
452 } // browser_sync | |
OLD | NEW |