Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(291)

Side by Side Diff: chrome/browser/sync/engine/syncer_thread2.cc

Issue 5939006: sync: beginnings of MessageLoop based SyncerThread (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: comment Created 10 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2010 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "chrome/browser/sync/engine/syncer_thread2.h"
6
7 #include "base/rand_util.h"
8 #include "chrome/browser/sync/engine/syncer.h"
9
10 using base::TimeDelta;
11 using base::TimeTicks;
12
13 namespace browser_sync {
14
15 using sessions::SyncSession;
16 using sessions::SyncSessionSnapshot;
17 using sessions::SyncSourceInfo;
18 using syncable::ModelTypeBitSet;
19 using sync_pb::GetUpdatesCallerInfo;
20
21 namespace s3 {
22
23 SyncerThread::SyncerThread(sessions::SyncSessionContext* context,
24 Syncer* syncer)
25 : thread_("SyncEngine_SyncerThread"),
26 syncer_short_poll_interval_seconds_(
27 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
28 syncer_long_poll_interval_seconds_(
29 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
30 server_connection_ok_(false),
31 syncer_(syncer),
32 session_context_(context) {
33 }
34
35 SyncerThread::~SyncerThread() {
36 DCHECK(!thread_.IsRunning());
37 }
38
39 void SyncerThread::Start(Mode mode) {
40 if (!thread_.IsRunning() && !thread_.Start()) {
41 NOTREACHED() << "Unable to start SyncerThread.";
42 return;
43 }
44
45 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
46 this, &SyncerThread::StartImpl, mode));
47 }
48
49 void SyncerThread::StartImpl(Mode mode) {
50 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
51 DCHECK(!session_context_->account_name().empty());
52 DCHECK(syncer_.get());
53 mode_ = mode;
54 AdjustPolling(NULL); // Will kick start poll timer if needed.
55 }
56
57 bool SyncerThread::ShouldRunJob(SyncSessionJob::Purpose purpose,
58 const TimeTicks& scheduled_start) {
59 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
60
61 // Check wait interval.
62 if (wait_interval_.get()) {
63 if (wait_interval_->mode == WaitInterval::THROTTLED) {
64 return false;
65 } else {
66 DCHECK_EQ(wait_interval_->mode,
67 WaitInterval::EXPONENTIAL_BACKOFF);
68 switch (purpose) {
69 case SyncSessionJob::NUDGE:
70 if (!wait_interval_->had_nudge)
Nicolas Zea 2010/12/23 21:49:20 Why don't we back off if we haven't had a nudge?
tim (not reviewing) 2010/12/24 04:23:47 the code allows one nudge per backoff interval (ak
71 break;
72 default:
73 DCHECK(purpose == SyncSessionJob::POLL ||
74 purpose == SyncSessionJob::NUDGE);
75 return false;
76 }
77 }
78 }
79
80 // Mode / purpose contract.
Nicolas Zea 2010/12/23 21:49:20 An explanation of when the contract might be viola
tim (not reviewing) 2010/12/24 04:23:47 Ok. I'll adapt the 'Mode' comment in the header a
81 switch (mode_) {
82 case CONFIGURATION_MODE:
83 if (purpose != SyncSessionJob::CONFIGURATION)
84 return false;
85 break;
86 case NORMAL_MODE:
87 if (purpose != SyncSessionJob::POLL && purpose != SyncSessionJob::NUDGE)
88 return false;
89 break;
90 default:
91 NOTREACHED() << "Unknown SyncerThread Mode: " << mode_;
92 return false;
93 }
94
95 // Freshness condition.
96 if (purpose == SyncSessionJob::NUDGE &&
97 (scheduled_start < last_sync_session_end_time_)) {
98 return false;
99 }
100
101 return server_connection_ok_;
102 }
103
104 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
105 NudgeSource source) {
106 switch (source) {
107 case NUDGE_SOURCE_NOTIFICATION:
108 return GetUpdatesCallerInfo::NOTIFICATION;
109 case NUDGE_SOURCE_LOCAL:
110 return GetUpdatesCallerInfo::LOCAL;
111 case NUDGE_SOURCE_CONTINUATION:
112 return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
113 case NUDGE_SOURCE_CLEAR_PRIVATE_DATA:
114 return GetUpdatesCallerInfo::CLEAR_PRIVATE_DATA;
115 case NUDGE_SOURCE_UNKNOWN:
116 default:
117 return GetUpdatesCallerInfo::UNKNOWN;
118 }
119 }
120
121 void SyncerThread::ScheduleNudge(const TimeDelta& delay,
122 NudgeSource source, const ModelTypeBitSet& types) {
123 if (!thread_.IsRunning()) {
124 NOTREACHED();
125 return;
126 }
127
128 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
129 this, &SyncerThread::ScheduleNudgeImpl, delay, source, types));
130 }
131
132 // Functor for std::find_if to search by ModelSafeGroup.
133 struct WorkerGroupIs {
134 explicit WorkerGroupIs(ModelSafeGroup group) : group(group) {}
135 bool operator()(ModelSafeWorker* w) {
136 return group == w->GetModelSafeGroup();
137 }
138 ModelSafeGroup group;
139 };
140
141 void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay,
142 NudgeSource source, const ModelTypeBitSet& model_types) {
143 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
144 TimeTicks rough_start = TimeTicks::Now() + delay;
145 if (!ShouldRunJob(SyncSessionJob::NUDGE, rough_start))
146 return;
147
148 // Note we currently nudge for all types regardless of the ones incurring
149 // the nudge. Doing different would throw off some syncer commands like
150 // CleanupDisabledTypes. We may want to change this in the future.
151 ModelSafeRoutingInfo routes;
152 std::vector<ModelSafeWorker*> workers;
153 session_context_->registrar()->GetModelSafeRoutingInfo(&routes);
154 session_context_->registrar()->GetWorkers(&workers);
155 SyncSourceInfo info(GetUpdatesFromNudgeSource(source), model_types);
156
157 scoped_refptr<SyncSession> session = new SyncSession(
158 session_context_.get(), this, info, routes, workers);
159
160 if (pending_nudge_.get()) {
161 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1))
162 return;
163
164 pending_nudge_->session->Coalesce(session);
165 if (!IsBackingOff())
166 return;
167 }
168 ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, session.release());
169 }
170
171 // Helper to extract the routing info and workers corresponding to types in
172 // |types| from |registrar|.
173 void GetModelSafeParamsForTypes(const ModelTypeBitSet& types,
174 ModelSafeWorkerRegistrar* registrar, ModelSafeRoutingInfo* routes,
175 std::vector<ModelSafeWorker*>* workers) {
176 ModelSafeRoutingInfo r_tmp;
177 std::vector<ModelSafeWorker*> w_tmp;
178 registrar->GetModelSafeRoutingInfo(&r_tmp);
179 registrar->GetWorkers(&w_tmp);
180
181 typedef std::vector<ModelSafeWorker*>::const_iterator iter;
182 for (size_t i = syncable::FIRST_REAL_MODEL_TYPE; i < types.size(); ++i) {
183 if (!types.test(i))
184 continue;
185 syncable::ModelType t = syncable::ModelTypeFromInt(i);
186 DCHECK_EQ(1U, r_tmp.count(t));
187 (*routes)[t] = r_tmp[t];
188 iter it = std::find_if(w_tmp.begin(), w_tmp.end(), WorkerGroupIs(r_tmp[t]));
189 DCHECK(w_tmp.end() != it);
190 workers->push_back(*it);
191 }
192
193 iter it = std::find_if(w_tmp.begin(), w_tmp.end(),
194 WorkerGroupIs(GROUP_PASSIVE));
195 DCHECK(w_tmp.end() != it);
196 workers->push_back(*it);
197 }
198
199 void SyncerThread::ScheduleConfig(const TimeDelta& delay,
200 const ModelTypeBitSet& types) {
201 if (!thread_.IsRunning()) {
202 NOTREACHED();
203 return;
204 }
205
206 ModelSafeRoutingInfo routes;
207 std::vector<ModelSafeWorker*> workers;
208 GetModelSafeParamsForTypes(types, session_context_->registrar(),
209 &routes, &workers);
210
211 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
212 this, &SyncerThread::ScheduleConfigImpl, delay, routes, workers));
213 }
214
215 void SyncerThread::ScheduleConfigImpl(const TimeDelta& delay,
216 const ModelSafeRoutingInfo& routing_info,
217 const std::vector<ModelSafeWorker*>& workers) {
218 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
219 NOTIMPLEMENTED() << "TODO(tim)";
220 }
221
222 void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay,
223 SyncSessionJob::Purpose purpose, sessions::SyncSession* session) {
224 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
225 SyncSessionJob job = {purpose, TimeTicks::Now() + delay, session};
226 if (purpose == SyncSessionJob::NUDGE) {
227 DCHECK(!pending_nudge_.get());
228 pending_nudge_.reset(new SyncSessionJob(job));
229 }
230 MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(this,
231 &SyncerThread::DoSyncSessionJob, job), delay.InMilliseconds());
232 }
233
234 void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) {
235 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
236
237 if (job.purpose == SyncSessionJob::NUDGE) {
238 DCHECK(pending_nudge_.get());
239 if (pending_nudge_->session != job.session)
240 return; // Another nudge must have been scheduled in in the meantime.
241 pending_nudge_.reset();
242 } else if (job.purpose == SyncSessionJob::CONFIGURATION) {
243 NOTIMPLEMENTED() << "TODO(tim): SyncShare [DOWNLOAD_UPDATES,APPLY_UPDATES]";
244 }
245
246 bool has_more_to_sync = true;
247 while (ShouldRunJob(job.purpose, job.scheduled_start) && has_more_to_sync) {
248 VLOG(1) << "SyncerThread: Calling SyncShare.";
249 // Synchronously perform the sync session from this thread.
250 syncer_->SyncShare(job.session);
251 has_more_to_sync = job.session->HasMoreToSync();
252 if (has_more_to_sync)
253 job.session->ResetTransientState();
254 }
255 VLOG(1) << "SyncerThread: Done SyncShare looping.";
256 FinishSyncSessionJob(job);
257 }
258
259 void SyncerThread::FinishSyncSessionJob(const SyncSessionJob& job) {
260 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
261 // Update timing information for how often datatypes are triggering nudges.
262 base::TimeTicks now = TimeTicks::Now();
263 for (size_t i = syncable::FIRST_REAL_MODEL_TYPE;
264 i < job.session->source().second.size() &&
265 !last_sync_session_end_time_.is_null();
266 ++i) {
267 if (job.session->source().second[i]) {
268 syncable::PostTimeToTypeHistogram(syncable::ModelTypeFromInt(i),
269 now - last_sync_session_end_time_);
270 }
271 }
272 last_sync_session_end_time_ = now;
273 if (IsSyncingCurrentlySilenced())
274 return; // Nothing to do.
275
276 VLOG(1) << "Updating the next polling time after SyncMain";
277 ScheduleNextSync(job);
278 }
279
280 void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) {
281 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
282 const bool work_to_do =
283 old_job.session->status_controller()->num_server_changes_remaining() >
284 old_job.session->status_controller()->ComputeMaxLocalTimestamp() ||
285 old_job.session->status_controller()->unsynced_handles().size() > 0;
286 VLOG(1) << "syncer has work to do: " << work_to_do;
287
288 AdjustPolling(&old_job);
289
290 // TODO(tim): Old impl had special code if notifications disabled. Needed?
291 if (!work_to_do)
292 return;
293
294 if (old_job.session->source().first ==
295 GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION) {
296 // We don't seem to have made forward progress. Start or extend backoff.
297 HandleConsecutiveContinuationError(old_job);
298 } else if (IsBackingOff()) {
299 // We weren't continuing but we're in backoff; must have been a nudge.
300 DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose);
301 DCHECK(!wait_interval_->had_nudge);
302 wait_interval_->had_nudge = true;
303 wait_interval_->timer.Reset();
304 } else {
305 // We weren't continuing and we aren't in backoff. Schedule a normal
306 // continuation.
307 ScheduleNudgeImpl(GetRecommendedDelay(TimeDelta::FromSeconds(0)),
308 NUDGE_SOURCE_CONTINUATION,
309 old_job.session->source().second);
310 }
311 }
312
313 void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) {
314 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
315 TimeDelta poll = (!session_context_->notifications_enabled()) ?
316 syncer_short_poll_interval_seconds_ :
317 syncer_long_poll_interval_seconds_;
318 bool rate_changed = !poll_timer_.IsRunning() ||
319 poll != poll_timer_.GetCurrentDelay();
320
321 if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed)
322 poll_timer_.Reset();
323
324 if (!rate_changed)
325 return;
326
327 // Adjust poll rate.
328 poll_timer_.Stop();
329 poll_timer_.Start(poll, this, &SyncerThread::PollTimerCallback);
330 }
331
332 void SyncerThread::HandleConsecutiveContinuationError(
333 const SyncSessionJob& old_job) {
334 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
335 DCHECK(!IsBackingOff() || !wait_interval_->timer.IsRunning());
336 SyncSession* old = old_job.session;
337 SyncSession* s(new SyncSession(session_context_.get(), this,
338 old->source(), old->routing_info(), old->workers()));
339 TimeDelta length = GetRecommendedDelay(
340 IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1));
341 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF,
342 length));
343 SyncSessionJob job = {SyncSessionJob::NUDGE, TimeTicks::Now() + length, s};
344 pending_nudge_.reset(new SyncSessionJob(job));
345 wait_interval_->timer.Start(length, this, &SyncerThread::DoCanaryJob);
346 }
347
348 // static
349 TimeDelta SyncerThread::GetRecommendedDelay(const TimeDelta& last_delay) {
350 if (last_delay.InSeconds() >= kMaxBackoffSeconds)
351 return TimeDelta::FromSeconds(kMaxBackoffSeconds);
352
353 // This calculates approx. base_delay_seconds * 2 +/- base_delay_seconds / 2
354 int64 backoff_s =
355 std::max(1LL, last_delay.InSeconds() * kBackoffRandomizationFactor);
356
357 // Flip a coin to randomize backoff interval by +/- 50%.
358 int rand_sign = base::RandInt(0, 1) * 2 - 1;
359
360 // Truncation is adequate for rounding here.
361 backoff_s = backoff_s +
362 (rand_sign * (last_delay.InSeconds() / kBackoffRandomizationFactor));
363
364 // Cap the backoff interval.
365 backoff_s = std::max(1LL, std::min(backoff_s, kMaxBackoffSeconds));
366
367 return TimeDelta::FromSeconds(backoff_s);
368 }
369
370 void SyncerThread::Stop() {
371 syncer_->RequestEarlyExit(); // Safe to call from any thread.
372 thread_.Stop();
373 Notify(SyncEngineEvent::SYNCER_THREAD_EXITING);
374 }
375
376 void SyncerThread::DoCanaryJob() {
377 DCHECK(pending_nudge_.get());
378 wait_interval_->had_nudge = false;
379 DoSyncSessionJob(*pending_nudge_);
380 }
381
382 void SyncerThread::PollTimerCallback() {
383 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
384 ModelSafeRoutingInfo r;
385 std::vector<ModelSafeWorker*> w;
386 session_context_->registrar()->GetModelSafeRoutingInfo(&r);
387 session_context_->registrar()->GetWorkers(&w);
388 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, ModelTypeBitSet());
389 SyncSession* s = new SyncSession(session_context_.get(), this, info, r, w);
390 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s);
391 }
392
393 void SyncerThread::Unthrottle() {
394 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
395 wait_interval_.reset();
396 }
397
398 void SyncerThread::Notify(SyncEngineEvent::EventCause cause) {
399 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
400 session_context_->NotifyListeners(SyncEngineEvent(cause));
401 }
402
403 bool SyncerThread::IsBackingOff() const {
404 return wait_interval_.get() && wait_interval_->mode ==
405 WaitInterval::EXPONENTIAL_BACKOFF;
406 }
407
408 void SyncerThread::OnSilencedUntil(const base::TimeTicks& silenced_until) {
409 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED,
410 silenced_until - TimeTicks::Now()));
411 wait_interval_->timer.Start(wait_interval_->length, this,
412 &SyncerThread::Unthrottle);
413 }
414
415 bool SyncerThread::IsSyncingCurrentlySilenced() {
416 return wait_interval_.get() && wait_interval_->mode ==
417 WaitInterval::THROTTLED;
418 }
419
420 void SyncerThread::OnReceivedShortPollIntervalUpdate(
421 const base::TimeDelta& new_interval) {
422 syncer_short_poll_interval_seconds_ = new_interval;
423 }
424
425 void SyncerThread::OnReceivedLongPollIntervalUpdate(
426 const base::TimeDelta& new_interval) {
427 syncer_long_poll_interval_seconds_ = new_interval;
428 }
429
430 void SyncerThread::OnShouldStopSyncingPermanently() {
431 syncer_->RequestEarlyExit(); // Thread-safe.
432 Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY);
433 }
434
435 void SyncerThread::OnServerConnectionEvent(
436 const ServerConnectionEvent& event) {
437 NOTIMPLEMENTED();
438 }
439
440 } // s3
441 } // browser_sync
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698