Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(157)

Side by Side Diff: chrome/browser/sync/engine/syncer_thread2.cc

Issue 5939006: sync: beginnings of MessageLoop based SyncerThread (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: revert syncer_thread.cc Created 9 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "chrome/browser/sync/engine/syncer_thread2.h"
6
7 #include "base/rand_util.h"
8 #include "chrome/browser/sync/engine/syncer.h"
9
10 using base::TimeDelta;
11 using base::TimeTicks;
12
13 namespace browser_sync {
14
15 using sessions::SyncSession;
16 using sessions::SyncSessionSnapshot;
17 using sessions::SyncSourceInfo;
18 using syncable::ModelTypeBitSet;
19 using sync_pb::GetUpdatesCallerInfo;
20
21 namespace s3 {
22
23 SyncerThread::SyncerThread(sessions::SyncSessionContext* context,
24 Syncer* syncer)
25 : thread_("SyncEngine_SyncerThread"),
26 syncer_short_poll_interval_seconds_(
27 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
28 syncer_long_poll_interval_seconds_(
29 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
30 server_connection_ok_(false),
31 syncer_(syncer),
32 session_context_(context) {
33 }
34
35 SyncerThread::~SyncerThread() {
36 DCHECK(!thread_.IsRunning());
37 }
38
39 void SyncerThread::Start(Mode mode) {
40 if (!thread_.IsRunning() && !thread_.Start()) {
41 NOTREACHED() << "Unable to start SyncerThread.";
42 return;
43 }
44
45 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
46 this, &SyncerThread::StartImpl, mode));
47 }
48
49 void SyncerThread::StartImpl(Mode mode) {
50 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
51 DCHECK(!session_context_->account_name().empty());
52 DCHECK(syncer_.get());
53 mode_ = mode;
54 AdjustPolling(NULL); // Will kick start poll timer if needed.
55 }
56
57 bool SyncerThread::ShouldRunJob(SyncSessionJob::Purpose purpose,
58 const TimeTicks& scheduled_start) {
59 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
60
61 // Check wait interval.
62 if (wait_interval_.get()) {
63 if (wait_interval_->mode == WaitInterval::THROTTLED) {
64 return false;
65 } else {
akalin 2011/01/20 00:47:57 no need for else since the if returns.
tim (not reviewing) 2011/01/25 03:23:05 Done.
66 DCHECK_EQ(wait_interval_->mode,
67 WaitInterval::EXPONENTIAL_BACKOFF);
68 switch (purpose) {
akalin 2011/01/20 00:47:57 isn't this clearer as: DCHECK(purpose == POLL ||
tim (not reviewing) 2011/01/25 03:23:05 Done.
69 case SyncSessionJob::NUDGE:
70 if (!wait_interval_->had_nudge)
71 break;
72 default:
73 DCHECK(purpose == SyncSessionJob::POLL ||
74 purpose == SyncSessionJob::NUDGE);
75 return false;
76 }
77 }
78 }
79
80 // Mode / purpose contract (See 'Mode' enum in header). Don't run jobs that
81 // were intended for a normal sync if we are in configuration mode, and vice
82 // versa.
83 switch (mode_) {
84 case CONFIGURATION_MODE:
85 if (purpose != SyncSessionJob::CONFIGURATION)
86 return false;
87 break;
88 case NORMAL_MODE:
89 if (purpose != SyncSessionJob::POLL && purpose != SyncSessionJob::NUDGE)
90 return false;
91 break;
92 default:
93 NOTREACHED() << "Unknown SyncerThread Mode: " << mode_;
94 return false;
95 }
96
97 // Freshness condition.
98 if (purpose == SyncSessionJob::NUDGE &&
99 (scheduled_start < last_sync_session_end_time_)) {
100 return false;
101 }
102
103 return server_connection_ok_;
104 }
105
106 GetUpdatesCallerInfo::GetUpdatesSource GetUpdatesFromNudgeSource(
akalin 2011/01/20 00:47:57 anon namespace?
tim (not reviewing) 2011/01/25 03:23:05 Done.
107 NudgeSource source) {
108 switch (source) {
109 case NUDGE_SOURCE_NOTIFICATION:
110 return GetUpdatesCallerInfo::NOTIFICATION;
111 case NUDGE_SOURCE_LOCAL:
112 return GetUpdatesCallerInfo::LOCAL;
113 case NUDGE_SOURCE_CONTINUATION:
114 return GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
115 case NUDGE_SOURCE_CLEAR_PRIVATE_DATA:
116 return GetUpdatesCallerInfo::CLEAR_PRIVATE_DATA;
117 case NUDGE_SOURCE_UNKNOWN:
118 default:
119 return GetUpdatesCallerInfo::UNKNOWN;
120 }
121 }
122
123 void SyncerThread::ScheduleNudge(const TimeDelta& delay,
124 NudgeSource source, const ModelTypeBitSet& types) {
125 if (!thread_.IsRunning()) {
126 NOTREACHED();
127 return;
128 }
129
130 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
131 this, &SyncerThread::ScheduleNudgeImpl, delay, source, types));
132 }
133
134 // Functor for std::find_if to search by ModelSafeGroup.
akalin 2011/01/20 00:47:57 anon namespace?
tim (not reviewing) 2011/01/25 03:23:05 Done.
135 struct WorkerGroupIs {
136 explicit WorkerGroupIs(ModelSafeGroup group) : group(group) {}
137 bool operator()(ModelSafeWorker* w) {
138 return group == w->GetModelSafeGroup();
139 }
140 ModelSafeGroup group;
141 };
142
143 void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay,
144 NudgeSource source, const ModelTypeBitSet& model_types) {
145 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
146 TimeTicks rough_start = TimeTicks::Now() + delay;
147 if (!ShouldRunJob(SyncSessionJob::NUDGE, rough_start))
148 return;
149
150 // Note we currently nudge for all types regardless of the ones incurring
151 // the nudge. Doing different would throw off some syncer commands like
152 // CleanupDisabledTypes. We may want to change this in the future.
153 ModelSafeRoutingInfo routes;
154 std::vector<ModelSafeWorker*> workers;
155 session_context_->registrar()->GetModelSafeRoutingInfo(&routes);
156 session_context_->registrar()->GetWorkers(&workers);
157 SyncSourceInfo info(GetUpdatesFromNudgeSource(source), model_types);
158
159 scoped_ptr<SyncSession> session(new SyncSession(
160 session_context_.get(), this, info, routes, workers));
161
162 if (pending_nudge_.get()) {
163 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1))
164 return;
165
166 pending_nudge_->session->Coalesce(session.get());
167 if (!IsBackingOff())
168 return;
169 }
170 ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, session.release());
171 }
172
173 // Helper to extract the routing info and workers corresponding to types in
174 // |types| from |registrar|.
175 void GetModelSafeParamsForTypes(const ModelTypeBitSet& types,
176 ModelSafeWorkerRegistrar* registrar, ModelSafeRoutingInfo* routes,
177 std::vector<ModelSafeWorker*>* workers) {
178 ModelSafeRoutingInfo r_tmp;
179 std::vector<ModelSafeWorker*> w_tmp;
180 registrar->GetModelSafeRoutingInfo(&r_tmp);
181 registrar->GetWorkers(&w_tmp);
182
183 typedef std::vector<ModelSafeWorker*>::const_iterator iter;
184 for (size_t i = syncable::FIRST_REAL_MODEL_TYPE; i < types.size(); ++i) {
185 if (!types.test(i))
186 continue;
187 syncable::ModelType t = syncable::ModelTypeFromInt(i);
188 DCHECK_EQ(1U, r_tmp.count(t));
189 (*routes)[t] = r_tmp[t];
190 iter it = std::find_if(w_tmp.begin(), w_tmp.end(), WorkerGroupIs(r_tmp[t]));
191 DCHECK(w_tmp.end() != it);
akalin 2011/01/20 00:47:57 maybe: if (it != w_tmp.end() { workers->push_ba
tim (not reviewing) 2011/01/25 03:23:05 Yeah, probably safer.
192 workers->push_back(*it);
193 }
194
195 iter it = std::find_if(w_tmp.begin(), w_tmp.end(),
196 WorkerGroupIs(GROUP_PASSIVE));
197 DCHECK(w_tmp.end() != it);
akalin 2011/01/20 00:47:57 here, too
tim (not reviewing) 2011/01/25 03:23:05 Done.
198 workers->push_back(*it);
199 }
200
201 void SyncerThread::ScheduleConfig(const TimeDelta& delay,
202 const ModelTypeBitSet& types) {
203 if (!thread_.IsRunning()) {
204 NOTREACHED();
205 return;
206 }
207
208 ModelSafeRoutingInfo routes;
209 std::vector<ModelSafeWorker*> workers;
210 GetModelSafeParamsForTypes(types, session_context_->registrar(),
211 &routes, &workers);
212
213 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
214 this, &SyncerThread::ScheduleConfigImpl, delay, routes, workers));
215 }
216
217 void SyncerThread::ScheduleConfigImpl(const TimeDelta& delay,
218 const ModelSafeRoutingInfo& routing_info,
219 const std::vector<ModelSafeWorker*>& workers) {
220 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
221 NOTIMPLEMENTED() << "TODO(tim)";
222 }
223
224 void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay,
225 SyncSessionJob::Purpose purpose, sessions::SyncSession* session) {
226 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
227 SyncSessionJob job = {purpose, TimeTicks::Now() + delay,
228 make_linked_ptr(session)};
229 if (purpose == SyncSessionJob::NUDGE) {
230 DCHECK(!pending_nudge_.get());
231 pending_nudge_.reset(new SyncSessionJob(job));
232 }
233 MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(this,
234 &SyncerThread::DoSyncSessionJob, job), delay.InMilliseconds());
235 }
236
237 void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) {
238 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
239
240 if (job.purpose == SyncSessionJob::NUDGE) {
241 DCHECK(pending_nudge_.get());
242 if (pending_nudge_->session != job.session)
243 return; // Another nudge must have been scheduled in in the meantime.
244 pending_nudge_.reset();
245 } else if (job.purpose == SyncSessionJob::CONFIGURATION) {
246 NOTIMPLEMENTED() << "TODO(tim): SyncShare [DOWNLOAD_UPDATES,APPLY_UPDATES]";
247 }
248
249 bool has_more_to_sync = true;
250 while (ShouldRunJob(job.purpose, job.scheduled_start) && has_more_to_sync) {
251 VLOG(1) << "SyncerThread: Calling SyncShare.";
252 // Synchronously perform the sync session from this thread.
253 syncer_->SyncShare(job.session.get());
254 has_more_to_sync = job.session->HasMoreToSync();
255 if (has_more_to_sync)
256 job.session->ResetTransientState();
257 }
258 VLOG(1) << "SyncerThread: Done SyncShare looping.";
259 FinishSyncSessionJob(job);
260 }
261
262 void SyncerThread::FinishSyncSessionJob(const SyncSessionJob& job) {
263 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
264 // Update timing information for how often datatypes are triggering nudges.
265 base::TimeTicks now = TimeTicks::Now();
266 for (size_t i = syncable::FIRST_REAL_MODEL_TYPE;
267 i < job.session->source().second.size() &&
268 !last_sync_session_end_time_.is_null();
269 ++i) {
270 if (job.session->source().second[i]) {
271 syncable::PostTimeToTypeHistogram(syncable::ModelTypeFromInt(i),
272 now - last_sync_session_end_time_);
273 }
274 }
275 last_sync_session_end_time_ = now;
276 if (IsSyncingCurrentlySilenced())
277 return; // Nothing to do.
278
279 VLOG(1) << "Updating the next polling time after SyncMain";
280 ScheduleNextSync(job);
281 }
282
283 void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) {
284 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
285 const bool work_to_do =
286 old_job.session->status_controller()->num_server_changes_remaining() > 0
287 || old_job.session->status_controller()->unsynced_handles().size() > 0;
288 VLOG(1) << "syncer has work to do: " << work_to_do;
289
290 AdjustPolling(&old_job);
291
292 // TODO(tim): Old impl had special code if notifications disabled. Needed?
293 if (!work_to_do)
294 return;
295
296 if (old_job.session->source().first ==
297 GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION) {
298 // We don't seem to have made forward progress. Start or extend backoff.
299 HandleConsecutiveContinuationError(old_job);
300 } else if (IsBackingOff()) {
301 // We weren't continuing but we're in backoff; must have been a nudge.
302 DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose);
303 DCHECK(!wait_interval_->had_nudge);
304 wait_interval_->had_nudge = true;
305 wait_interval_->timer.Reset();
306 } else {
307 // We weren't continuing and we aren't in backoff. Schedule a normal
308 // continuation.
309 ScheduleNudgeImpl(GetRecommendedDelay(TimeDelta::FromSeconds(0)),
310 NUDGE_SOURCE_CONTINUATION,
Nicolas Zea 2011/01/20 20:01:57 Just for my understanding here, is this called whe
tim (not reviewing) 2011/01/25 03:23:05 Typically this means some sort of error occurred b
311 old_job.session->source().second);
312 }
313 }
314
315 void SyncerThread::AdjustPolling(const SyncSessionJob* old_job) {
316 DCHECK(thread_.IsRunning());
317 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
318
319 TimeDelta poll = (!session_context_->notifications_enabled()) ?
320 syncer_short_poll_interval_seconds_ :
321 syncer_long_poll_interval_seconds_;
322 bool rate_changed = !poll_timer_.IsRunning() ||
323 poll != poll_timer_.GetCurrentDelay();
324
325 if (old_job && old_job->purpose != SyncSessionJob::POLL && !rate_changed)
326 poll_timer_.Reset();
327
328 if (!rate_changed)
329 return;
330
331 // Adjust poll rate.
332 poll_timer_.Stop();
333 poll_timer_.Start(poll, this, &SyncerThread::PollTimerCallback);
334 }
335
336 void SyncerThread::HandleConsecutiveContinuationError(
337 const SyncSessionJob& old_job) {
338 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
339 DCHECK(!IsBackingOff() || !wait_interval_->timer.IsRunning());
340 SyncSession* old = old_job.session.get();
341 SyncSession* s(new SyncSession(session_context_.get(), this,
342 old->source(), old->routing_info(), old->workers()));
343 TimeDelta length = GetRecommendedDelay(
344 IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1));
345 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF,
346 length));
347 SyncSessionJob job = {SyncSessionJob::NUDGE, TimeTicks::Now() + length,
348 make_linked_ptr(s)};
349 pending_nudge_.reset(new SyncSessionJob(job));
350 wait_interval_->timer.Start(length, this, &SyncerThread::DoCanaryJob);
351 }
352
353 // static
354 TimeDelta SyncerThread::GetRecommendedDelay(const TimeDelta& last_delay) {
355 if (last_delay.InSeconds() >= kMaxBackoffSeconds)
356 return TimeDelta::FromSeconds(kMaxBackoffSeconds);
357
358 // This calculates approx. base_delay_seconds * 2 +/- base_delay_seconds / 2
359 int64 backoff_s =
360 std::max(1LL, last_delay.InSeconds() * kBackoffRandomizationFactor);
361
362 // Flip a coin to randomize backoff interval by +/- 50%.
363 int rand_sign = base::RandInt(0, 1) * 2 - 1;
364
365 // Truncation is adequate for rounding here.
366 backoff_s = backoff_s +
367 (rand_sign * (last_delay.InSeconds() / kBackoffRandomizationFactor));
368
369 // Cap the backoff interval.
370 backoff_s = std::max(1LL, std::min(backoff_s, kMaxBackoffSeconds));
371
372 return TimeDelta::FromSeconds(backoff_s);
373 }
374
375 void SyncerThread::Stop() {
376 syncer_->RequestEarlyExit(); // Safe to call from any thread.
377 thread_.Stop();
378 Notify(SyncEngineEvent::SYNCER_THREAD_EXITING);
379 }
380
381 void SyncerThread::DoCanaryJob() {
382 DCHECK(pending_nudge_.get());
383 wait_interval_->had_nudge = false;
384 DoSyncSessionJob(*pending_nudge_);
385 }
386
387 void SyncerThread::PollTimerCallback() {
388 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
389 ModelSafeRoutingInfo r;
390 std::vector<ModelSafeWorker*> w;
391 session_context_->registrar()->GetModelSafeRoutingInfo(&r);
392 session_context_->registrar()->GetWorkers(&w);
393 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, ModelTypeBitSet());
394 SyncSession* s = new SyncSession(session_context_.get(), this, info, r, w);
395 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s);
396 }
397
398 void SyncerThread::Unthrottle() {
399 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
400 wait_interval_.reset();
401 }
402
403 void SyncerThread::Notify(SyncEngineEvent::EventCause cause) {
404 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
405 session_context_->NotifyListeners(SyncEngineEvent(cause));
406 }
407
408 bool SyncerThread::IsBackingOff() const {
409 return wait_interval_.get() && wait_interval_->mode ==
410 WaitInterval::EXPONENTIAL_BACKOFF;
411 }
412
413 void SyncerThread::OnSilencedUntil(const base::TimeTicks& silenced_until) {
414 wait_interval_.reset(new WaitInterval(WaitInterval::THROTTLED,
415 silenced_until - TimeTicks::Now()));
416 wait_interval_->timer.Start(wait_interval_->length, this,
417 &SyncerThread::Unthrottle);
418 }
419
420 bool SyncerThread::IsSyncingCurrentlySilenced() {
akalin 2011/01/20 00:47:57 const method?
tim (not reviewing) 2011/01/25 03:23:05 Yeah, probably should be - I started to make this
421 return wait_interval_.get() && wait_interval_->mode ==
422 WaitInterval::THROTTLED;
423 }
424
425 void SyncerThread::OnReceivedShortPollIntervalUpdate(
426 const base::TimeDelta& new_interval) {
427 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
428 syncer_short_poll_interval_seconds_ = new_interval;
429 }
430
431 void SyncerThread::OnReceivedLongPollIntervalUpdate(
432 const base::TimeDelta& new_interval) {
433 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
434 syncer_long_poll_interval_seconds_ = new_interval;
435 }
436
437 void SyncerThread::OnShouldStopSyncingPermanently() {
438 syncer_->RequestEarlyExit(); // Thread-safe.
439 Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY);
440 }
441
442 void SyncerThread::OnServerConnectionEvent(
443 const ServerConnectionEvent& event) {
444 NOTIMPLEMENTED();
445 }
446
447 void SyncerThread::set_notifications_enabled(bool notifications_enabled) {
448 session_context_->set_notifications_enabled(notifications_enabled);
449 }
450
451 } // s3
452 } // browser_sync
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698