Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(301)

Side by Side Diff: chrome/browser/sync/engine/syncer_thread.cc

Issue 7024058: [Sync] Clean up sync logging (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: even more logging Created 9 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "chrome/browser/sync/engine/syncer_thread.h" 5 #include "chrome/browser/sync/engine/syncer_thread.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 8
9 #include "base/rand_util.h" 9 #include "base/rand_util.h"
10 #include "chrome/browser/sync/engine/syncer.h" 10 #include "chrome/browser/sync/engine/syncer.h"
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
72 default: 72 default:
73 NOTREACHED(); 73 NOTREACHED();
74 } 74 }
75 75
76 return GetUpdatesCallerInfo::UNKNOWN; 76 return GetUpdatesCallerInfo::UNKNOWN;
77 } 77 }
78 78
79 SyncerThread::WaitInterval::WaitInterval(Mode mode, TimeDelta length) 79 SyncerThread::WaitInterval::WaitInterval(Mode mode, TimeDelta length)
80 : mode(mode), had_nudge(false), length(length) { } 80 : mode(mode), had_nudge(false), length(length) { }
81 81
82 SyncerThread::SyncerThread(sessions::SyncSessionContext* context, 82 // Helper macro to log with the syncer thread name; useful when there
83 // are multiple syncer threads involved.
84 #define SVLOG(verbose_level) VLOG(verbose_level) << name_ << ": "
85
86 SyncerThread::SyncerThread(const std::string& name,
87 sessions::SyncSessionContext* context,
83 Syncer* syncer) 88 Syncer* syncer)
84 : thread_("SyncEngine_SyncerThread"), 89 : name_(name),
90 thread_("SyncEngine_SyncerThread"),
85 syncer_short_poll_interval_seconds_( 91 syncer_short_poll_interval_seconds_(
86 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), 92 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)),
87 syncer_long_poll_interval_seconds_( 93 syncer_long_poll_interval_seconds_(
88 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), 94 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)),
89 mode_(NORMAL_MODE), 95 mode_(NORMAL_MODE),
90 server_connection_ok_(false), 96 server_connection_ok_(false),
91 delay_provider_(new DelayProvider()), 97 delay_provider_(new DelayProvider()),
92 syncer_(syncer), 98 syncer_(syncer),
93 session_context_(context) { 99 session_context_(context) {
94 } 100 }
95 101
96 SyncerThread::~SyncerThread() { 102 SyncerThread::~SyncerThread() {
97 DCHECK(!thread_.IsRunning()); 103 DCHECK(!thread_.IsRunning());
98 } 104 }
99 105
100 void SyncerThread::CheckServerConnectionManagerStatus( 106 void SyncerThread::CheckServerConnectionManagerStatus(
101 HttpResponse::ServerConnectionCode code) { 107 HttpResponse::ServerConnectionCode code) {
108 bool old_server_connection_ok = server_connection_ok_;
102 109
103 VLOG(1) << "SyncerThread(" << this << ")" << " Server connection changed."
104 << "Old mode: " << server_connection_ok_ << " Code: " << code;
105 // Note, be careful when adding cases here because if the SyncerThread 110 // Note, be careful when adding cases here because if the SyncerThread
106 // thinks there is no valid connection as determined by this method, it 111 // thinks there is no valid connection as determined by this method, it
107 // will drop out of *all* forward progress sync loops (it won't poll and it 112 // will drop out of *all* forward progress sync loops (it won't poll and it
108 // will queue up Talk notifications but not actually call SyncShare) until 113 // will queue up Talk notifications but not actually call SyncShare) until
109 // some external action causes a ServerConnectionManager to broadcast that 114 // some external action causes a ServerConnectionManager to broadcast that
110 // a valid connection has been re-established. 115 // a valid connection has been re-established.
111 if (HttpResponse::CONNECTION_UNAVAILABLE == code || 116 if (HttpResponse::CONNECTION_UNAVAILABLE == code ||
112 HttpResponse::SYNC_AUTH_ERROR == code) { 117 HttpResponse::SYNC_AUTH_ERROR == code) {
113 server_connection_ok_ = false; 118 server_connection_ok_ = false;
114 VLOG(1) << "SyncerThread(" << this << ")" << " Server connection changed."
115 << " new mode:" << server_connection_ok_;
116 } else if (HttpResponse::SERVER_CONNECTION_OK == code) { 119 } else if (HttpResponse::SERVER_CONNECTION_OK == code) {
117 server_connection_ok_ = true; 120 server_connection_ok_ = true;
118 VLOG(1) << "SyncerThread(" << this << ")" << " Server connection changed."
119 << " new mode:" << server_connection_ok_;
120 DoCanaryJob(); 121 DoCanaryJob();
121 } 122 }
123
124 if (old_server_connection_ok != server_connection_ok_) {
125 SVLOG(2) << "Server connection changed. Old mode: "
126 << old_server_connection_ok << ", new mode: "
127 << server_connection_ok_ << ", code: " << code;
128 }
122 } 129 }
123 130
124 void SyncerThread::Start(Mode mode, ModeChangeCallback* callback) { 131 void SyncerThread::Start(Mode mode, ModeChangeCallback* callback) {
125 VLOG(1) << "SyncerThread(" << this << ")" << " Start called from thread " 132 SVLOG(2) << "Start called from thread "
126 << MessageLoop::current()->thread_name(); 133 << MessageLoop::current()->thread_name() << " with mode "
134 << mode;
127 if (!thread_.IsRunning()) { 135 if (!thread_.IsRunning()) {
128 VLOG(1) << "SyncerThread(" << this << ")" << " Starting thread with mode " 136 SVLOG(2) << "Starting thread with mode " << mode;
129 << mode;
130 if (!thread_.Start()) { 137 if (!thread_.Start()) {
131 NOTREACHED() << "Unable to start SyncerThread."; 138 NOTREACHED() << "Unable to start SyncerThread.";
132 return; 139 return;
133 } 140 }
134 WatchConnectionManager(); 141 WatchConnectionManager();
135 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( 142 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
136 this, &SyncerThread::SendInitialSnapshot)); 143 this, &SyncerThread::SendInitialSnapshot));
137 } 144 }
138 145
139 VLOG(1) << "SyncerThread(" << this << ")" << " Entering start with mode = "
140 << mode;
141
142 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( 146 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
143 this, &SyncerThread::StartImpl, mode, callback)); 147 this, &SyncerThread::StartImpl, mode, callback));
144 } 148 }
145 149
146 void SyncerThread::SendInitialSnapshot() { 150 void SyncerThread::SendInitialSnapshot() {
147 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 151 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
148 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_.get(), this, 152 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_.get(), this,
149 SyncSourceInfo(), ModelSafeRoutingInfo(), 153 SyncSourceInfo(), ModelSafeRoutingInfo(),
150 std::vector<ModelSafeWorker*>())); 154 std::vector<ModelSafeWorker*>()));
151 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); 155 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED);
152 sessions::SyncSessionSnapshot snapshot(dummy->TakeSnapshot()); 156 sessions::SyncSessionSnapshot snapshot(dummy->TakeSnapshot());
153 event.snapshot = &snapshot; 157 event.snapshot = &snapshot;
154 session_context_->NotifyListeners(event); 158 session_context_->NotifyListeners(event);
155 } 159 }
156 160
157 void SyncerThread::WatchConnectionManager() { 161 void SyncerThread::WatchConnectionManager() {
158 ServerConnectionManager* scm = session_context_->connection_manager(); 162 ServerConnectionManager* scm = session_context_->connection_manager();
159 CheckServerConnectionManagerStatus(scm->server_status()); 163 CheckServerConnectionManagerStatus(scm->server_status());
160 scm->AddListener(this); 164 scm->AddListener(this);
161 } 165 }
162 166
163 void SyncerThread::StartImpl(Mode mode, ModeChangeCallback* callback) { 167 void SyncerThread::StartImpl(Mode mode, ModeChangeCallback* callback) {
164 VLOG(1) << "SyncerThread(" << this << ")" << " Doing StartImpl with mode " 168 SVLOG(2) << "Doing StartImpl with mode " << mode;
165 << mode;
166 169
167 // TODO(lipalani): This will leak if startimpl is never run. Fix it using a 170 // TODO(lipalani): This will leak if startimpl is never run. Fix it using a
168 // ThreadSafeRefcounted object. 171 // ThreadSafeRefcounted object.
169 scoped_ptr<ModeChangeCallback> scoped_callback(callback); 172 scoped_ptr<ModeChangeCallback> scoped_callback(callback);
170 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 173 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
171 DCHECK(!session_context_->account_name().empty()); 174 DCHECK(!session_context_->account_name().empty());
172 DCHECK(syncer_.get()); 175 DCHECK(syncer_.get());
173 mode_ = mode; 176 mode_ = mode;
174 AdjustPolling(NULL); // Will kick start poll timer if needed. 177 AdjustPolling(NULL); // Will kick start poll timer if needed.
175 if (scoped_callback.get()) 178 if (scoped_callback.get())
176 scoped_callback->Run(); 179 scoped_callback->Run();
177 180
178 // We just changed our mode. See if there are any pending jobs that we could 181 // We just changed our mode. See if there are any pending jobs that we could
179 // execute in the new mode. 182 // execute in the new mode.
180 DoPendingJobIfPossible(false); 183 DoPendingJobIfPossible(false);
181 } 184 }
182 185
183 SyncerThread::JobProcessDecision SyncerThread::DecideWhileInWaitInterval( 186 SyncerThread::JobProcessDecision SyncerThread::DecideWhileInWaitInterval(
184 const SyncSessionJob& job) { 187 const SyncSessionJob& job) {
185 188
186 DCHECK(wait_interval_.get()); 189 DCHECK(wait_interval_.get());
187 DCHECK_NE(job.purpose, SyncSessionJob::CLEAR_USER_DATA); 190 DCHECK_NE(job.purpose, SyncSessionJob::CLEAR_USER_DATA);
188 191
189 VLOG(1) << "SyncerThread(" << this << ")" << " Wait interval mode : " 192 SVLOG(2) << "Wait interval mode : "
190 << wait_interval_->mode << "Wait interval had nudge : " 193 << wait_interval_->mode << "Wait interval had nudge : "
Nicolas Zea 2011/06/07 19:55:24 "Wait interval... -> ", wait interval... "is canar
akalin 2011/06/07 20:17:38 Done.
191 << wait_interval_->had_nudge << "is canary job : " 194 << wait_interval_->had_nudge << "is canary job : "
192 << job.is_canary_job; 195 << job.is_canary_job;
193 196
194 if (job.purpose == SyncSessionJob::POLL) 197 if (job.purpose == SyncSessionJob::POLL)
195 return DROP; 198 return DROP;
196 199
197 DCHECK(job.purpose == SyncSessionJob::NUDGE || 200 DCHECK(job.purpose == SyncSessionJob::NUDGE ||
198 job.purpose == SyncSessionJob::CONFIGURATION); 201 job.purpose == SyncSessionJob::CONFIGURATION);
199 if (wait_interval_->mode == WaitInterval::THROTTLED) 202 if (wait_interval_->mode == WaitInterval::THROTTLED)
200 return SAVE; 203 return SAVE;
201 204
202 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); 205 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF);
(...skipping 25 matching lines...) Expand all
228 else 231 else
229 return DROP; 232 return DROP;
230 } 233 }
231 234
232 // We are in normal mode. 235 // We are in normal mode.
233 DCHECK_EQ(mode_, NORMAL_MODE); 236 DCHECK_EQ(mode_, NORMAL_MODE);
234 DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION); 237 DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION);
235 238
236 // Freshness condition 239 // Freshness condition
237 if (job.scheduled_start < last_sync_session_end_time_) { 240 if (job.scheduled_start < last_sync_session_end_time_) {
238 VLOG(1) << "SyncerThread(" << this << ")" 241 SVLOG(2) << "Dropping job because of freshness";
239 << " Dropping job because of freshness";
240 return DROP; 242 return DROP;
241 } 243 }
242 244
243 if (server_connection_ok_) 245 if (server_connection_ok_)
244 return CONTINUE; 246 return CONTINUE;
245 247
246 VLOG(1) << "SyncerThread(" << this << ")" 248 SVLOG(2) << "Bad server connection. Using that to decide on job.";
247 << " Bad server connection. Using that to decide on job.";
248 return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP; 249 return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP;
249 } 250 }
250 251
251 void SyncerThread::InitOrCoalescePendingJob(const SyncSessionJob& job) { 252 void SyncerThread::InitOrCoalescePendingJob(const SyncSessionJob& job) {
252 DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); 253 DCHECK(job.purpose != SyncSessionJob::CONFIGURATION);
253 if (pending_nudge_.get() == NULL) { 254 if (pending_nudge_.get() == NULL) {
254 VLOG(1) << "SyncerThread(" << this << ")" 255 SVLOG(2) << "Creating a pending nudge job";
255 << " Creating a pending nudge job";
256 SyncSession* s = job.session.get(); 256 SyncSession* s = job.session.get();
257 scoped_ptr<SyncSession> session(new SyncSession(s->context(), 257 scoped_ptr<SyncSession> session(new SyncSession(s->context(),
258 s->delegate(), s->source(), s->routing_info(), s->workers())); 258 s->delegate(), s->source(), s->routing_info(), s->workers()));
259 259
260 SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, 260 SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start,
261 make_linked_ptr(session.release()), false, job.nudge_location); 261 make_linked_ptr(session.release()), false, job.nudge_location);
262 pending_nudge_.reset(new SyncSessionJob(new_job)); 262 pending_nudge_.reset(new SyncSessionJob(new_job));
263 263
264 return; 264 return;
265 } 265 }
266 266
267 VLOG(1) << "SyncerThread(" << this << ")" << " Coalescing a pending nudge"; 267 SVLOG(2) << "Coalescing a pending nudge";
268 pending_nudge_->session->Coalesce(*(job.session.get())); 268 pending_nudge_->session->Coalesce(*(job.session.get()));
269 pending_nudge_->scheduled_start = job.scheduled_start; 269 pending_nudge_->scheduled_start = job.scheduled_start;
270 270
271 // Unfortunately the nudge location cannot be modified. So it stores the 271 // Unfortunately the nudge location cannot be modified. So it stores the
272 // location of the first caller. 272 // location of the first caller.
273 } 273 }
274 274
275 bool SyncerThread::ShouldRunJob(const SyncSessionJob& job) { 275 bool SyncerThread::ShouldRunJob(const SyncSessionJob& job) {
276 JobProcessDecision decision = DecideOnJob(job); 276 JobProcessDecision decision = DecideOnJob(job);
277 VLOG(1) << "SyncerThread(" << this << ")" << " Should run job, decision: " 277 SVLOG(2) << "Should run job, decision: "
278 << decision << " Job purpose " << job.purpose << "mode " << mode_; 278 << decision << " Job purpose " << job.purpose
Nicolas Zea 2011/06/07 19:55:24 ", Job purpose "
akalin 2011/06/07 20:17:38 Done.
279 << ", mode " << mode_;
279 if (decision != SAVE) 280 if (decision != SAVE)
280 return decision == CONTINUE; 281 return decision == CONTINUE;
281 282
282 DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose == 283 DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose ==
283 SyncSessionJob::CONFIGURATION); 284 SyncSessionJob::CONFIGURATION);
284 285
285 SaveJob(job); 286 SaveJob(job);
286 return false; 287 return false;
287 } 288 }
288 289
289 void SyncerThread::SaveJob(const SyncSessionJob& job) { 290 void SyncerThread::SaveJob(const SyncSessionJob& job) {
290 DCHECK(job.purpose != SyncSessionJob::CLEAR_USER_DATA); 291 DCHECK(job.purpose != SyncSessionJob::CLEAR_USER_DATA);
291 if (job.purpose == SyncSessionJob::NUDGE) { 292 if (job.purpose == SyncSessionJob::NUDGE) {
292 VLOG(1) << "SyncerThread(" << this << ")" << " Saving a nudge job"; 293 SVLOG(2) << "Saving a nudge job";
293 InitOrCoalescePendingJob(job); 294 InitOrCoalescePendingJob(job);
294 } else if (job.purpose == SyncSessionJob::CONFIGURATION){ 295 } else if (job.purpose == SyncSessionJob::CONFIGURATION){
295 VLOG(1) << "SyncerThread(" << this << ")" << " Saving a configuration job"; 296 SVLOG(2) << "Saving a configuration job";
296 DCHECK(wait_interval_.get()); 297 DCHECK(wait_interval_.get());
297 DCHECK(mode_ == CONFIGURATION_MODE); 298 DCHECK(mode_ == CONFIGURATION_MODE);
298 299
299 SyncSession* old = job.session.get(); 300 SyncSession* old = job.session.get();
300 SyncSession* s(new SyncSession(session_context_.get(), this, 301 SyncSession* s(new SyncSession(session_context_.get(), this,
301 old->source(), old->routing_info(), old->workers())); 302 old->source(), old->routing_info(), old->workers()));
302 SyncSessionJob new_job(job.purpose, TimeTicks::Now(), 303 SyncSessionJob new_job(job.purpose, TimeTicks::Now(),
303 make_linked_ptr(s), false, job.nudge_location); 304 make_linked_ptr(s), false, job.nudge_location);
304 wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); 305 wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job));
305 } // drop the rest. 306 } // drop the rest.
(...skipping 14 matching lines...) Expand all
320 return; 321 return;
321 } 322 }
322 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( 323 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
323 this, &SyncerThread::ScheduleClearUserDataImpl)); 324 this, &SyncerThread::ScheduleClearUserDataImpl));
324 } 325 }
325 326
326 void SyncerThread::ScheduleNudge(const TimeDelta& delay, 327 void SyncerThread::ScheduleNudge(const TimeDelta& delay,
327 NudgeSource source, const ModelTypeBitSet& types, 328 NudgeSource source, const ModelTypeBitSet& types,
328 const tracked_objects::Location& nudge_location) { 329 const tracked_objects::Location& nudge_location) {
329 if (!thread_.IsRunning()) { 330 if (!thread_.IsRunning()) {
330 VLOG(0) << "Dropping nudge because thread is not running."; 331 LOG(INFO) << "Dropping nudge because thread is not running.";
331 NOTREACHED(); 332 NOTREACHED();
332 return; 333 return;
333 } 334 }
334 335
335 VLOG(1) << "SyncerThread(" << this << ")" << " Nudge scheduled"; 336 SVLOG(2) << "Nudge scheduled (source=" << source << ")";
336 337
337 ModelTypePayloadMap types_with_payloads = 338 ModelTypePayloadMap types_with_payloads =
338 syncable::ModelTypePayloadMapFromBitSet(types, std::string()); 339 syncable::ModelTypePayloadMapFromBitSet(types, std::string());
339 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( 340 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
340 this, &SyncerThread::ScheduleNudgeImpl, delay, 341 this, &SyncerThread::ScheduleNudgeImpl, delay,
341 GetUpdatesFromNudgeSource(source), types_with_payloads, false, 342 GetUpdatesFromNudgeSource(source), types_with_payloads, false,
342 nudge_location)); 343 nudge_location));
343 } 344 }
344 345
345 void SyncerThread::ScheduleNudgeWithPayloads(const TimeDelta& delay, 346 void SyncerThread::ScheduleNudgeWithPayloads(const TimeDelta& delay,
346 NudgeSource source, const ModelTypePayloadMap& types_with_payloads, 347 NudgeSource source, const ModelTypePayloadMap& types_with_payloads,
347 const tracked_objects::Location& nudge_location) { 348 const tracked_objects::Location& nudge_location) {
348 if (!thread_.IsRunning()) { 349 if (!thread_.IsRunning()) {
349 NOTREACHED(); 350 NOTREACHED();
350 return; 351 return;
351 } 352 }
352 353
353 VLOG(1) << "SyncerThread(" << this << ")" << " Nudge scheduled with payloads"; 354 SVLOG(2) << "Nudge scheduled with payloads (source=" << source << ")";
354 355
355 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( 356 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
356 this, &SyncerThread::ScheduleNudgeImpl, delay, 357 this, &SyncerThread::ScheduleNudgeImpl, delay,
357 GetUpdatesFromNudgeSource(source), types_with_payloads, false, 358 GetUpdatesFromNudgeSource(source), types_with_payloads, false,
358 nudge_location)); 359 nudge_location));
359 } 360 }
360 361
361 void SyncerThread::ScheduleClearUserDataImpl() { 362 void SyncerThread::ScheduleClearUserDataImpl() {
362 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 363 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
363 SyncSession* session = new SyncSession(session_context_.get(), this, 364 SyncSession* session = new SyncSession(session_context_.get(), this,
364 SyncSourceInfo(), ModelSafeRoutingInfo(), 365 SyncSourceInfo(), ModelSafeRoutingInfo(),
365 std::vector<ModelSafeWorker*>()); 366 std::vector<ModelSafeWorker*>());
366 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), 367 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0),
367 SyncSessionJob::CLEAR_USER_DATA, session, FROM_HERE); 368 SyncSessionJob::CLEAR_USER_DATA, session, FROM_HERE);
368 } 369 }
369 370
370 void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay, 371 void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay,
371 GetUpdatesCallerInfo::GetUpdatesSource source, 372 GetUpdatesCallerInfo::GetUpdatesSource source,
372 const ModelTypePayloadMap& types_with_payloads, 373 const ModelTypePayloadMap& types_with_payloads,
373 bool is_canary_job, const tracked_objects::Location& nudge_location) { 374 bool is_canary_job, const tracked_objects::Location& nudge_location) {
374 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 375 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
375 376
376 VLOG(1) << "SyncerThread(" << this << ")" << " Running Schedule nudge impl"; 377 SVLOG(2) << "Running Schedule nudge impl (source=" << source << ")";
377 // Note we currently nudge for all types regardless of the ones incurring 378 // Note we currently nudge for all types regardless of the ones incurring
378 // the nudge. Doing different would throw off some syncer commands like 379 // the nudge. Doing different would throw off some syncer commands like
379 // CleanupDisabledTypes. We may want to change this in the future. 380 // CleanupDisabledTypes. We may want to change this in the future.
380 SyncSourceInfo info(source, types_with_payloads); 381 SyncSourceInfo info(source, types_with_payloads);
381 382
382 SyncSession* session(CreateSyncSession(info)); 383 SyncSession* session(CreateSyncSession(info));
383 SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, 384 SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay,
384 make_linked_ptr(session), is_canary_job, 385 make_linked_ptr(session), is_canary_job,
385 nudge_location); 386 nudge_location);
386 387
387 session = NULL; 388 session = NULL;
388 if (!ShouldRunJob(job)) 389 if (!ShouldRunJob(job))
389 return; 390 return;
390 391
391 if (pending_nudge_.get()) { 392 if (pending_nudge_.get()) {
392 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) { 393 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) {
393 VLOG(1) << "SyncerThread(" << this << ")" << " Dropping the nudge because" 394 SVLOG(2) << "Dropping the nudge because we are in backoff";
394 << "we are in backoff";
395 return; 395 return;
396 } 396 }
397 397
398 VLOG(1) << "SyncerThread(" << this << ")" << " Coalescing pending nudge"; 398 SVLOG(2) << "Coalescing pending nudge";
399 pending_nudge_->session->Coalesce(*(job.session.get())); 399 pending_nudge_->session->Coalesce(*(job.session.get()));
400 400
401 if (!IsBackingOff()) { 401 if (!IsBackingOff()) {
402 VLOG(1) << "SyncerThread(" << this << ")" << " Dropping a nudge because" 402 SVLOG(2) << "Dropping a nudge because"
403 << " we are not in backoff and the job was coalesced"; 403 << " we are not in backoff and the job was coalesced";
404 return; 404 return;
405 } else { 405 } else {
406 VLOG(1) << "SyncerThread(" << this << ")" 406 SVLOG(2) << "Rescheduling pending nudge";
407 << " Rescheduling pending nudge";
408 SyncSession* s = pending_nudge_->session.get(); 407 SyncSession* s = pending_nudge_->session.get();
409 job.session.reset(new SyncSession(s->context(), s->delegate(), 408 job.session.reset(new SyncSession(s->context(), s->delegate(),
410 s->source(), s->routing_info(), s->workers())); 409 s->source(), s->routing_info(), s->workers()));
411 pending_nudge_.reset(); 410 pending_nudge_.reset();
412 } 411 }
413 } 412 }
414 413
415 // TODO(lipalani) - pass the job itself to ScheduleSyncSessionJob. 414 // TODO(lipalani) - pass the job itself to ScheduleSyncSessionJob.
416 ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, job.session.release(), 415 ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, job.session.release(),
417 nudge_location); 416 nudge_location);
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
458 if (it != w_tmp.end()) 457 if (it != w_tmp.end())
459 workers->push_back(*it); 458 workers->push_back(*it);
460 else 459 else
461 NOTREACHED(); 460 NOTREACHED();
462 } 461 }
463 } 462 }
464 463
465 void SyncerThread::ScheduleConfig(const ModelTypeBitSet& types, 464 void SyncerThread::ScheduleConfig(const ModelTypeBitSet& types,
466 sync_api::ConfigureReason reason) { 465 sync_api::ConfigureReason reason) {
467 if (!thread_.IsRunning()) { 466 if (!thread_.IsRunning()) {
468 VLOG(0) << "ScheduleConfig failed because thread is not running."; 467 LOG(INFO) << "ScheduleConfig failed because thread is not running.";
469 NOTREACHED(); 468 NOTREACHED();
470 return; 469 return;
471 } 470 }
472 471
473 VLOG(1) << "SyncerThread(" << this << ")" << " Scheduling a config"; 472 SVLOG(2) << "Scheduling a config";
474 ModelSafeRoutingInfo routes; 473 ModelSafeRoutingInfo routes;
475 std::vector<ModelSafeWorker*> workers; 474 std::vector<ModelSafeWorker*> workers;
476 GetModelSafeParamsForTypes(types, session_context_->registrar(), 475 GetModelSafeParamsForTypes(types, session_context_->registrar(),
477 &routes, &workers); 476 &routes, &workers);
478 477
479 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( 478 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
480 this, &SyncerThread::ScheduleConfigImpl, routes, workers, 479 this, &SyncerThread::ScheduleConfigImpl, routes, workers,
481 GetSourceFromReason(reason))); 480 GetSourceFromReason(reason)));
482 } 481 }
483 482
484 void SyncerThread::ScheduleConfigImpl(const ModelSafeRoutingInfo& routing_info, 483 void SyncerThread::ScheduleConfigImpl(const ModelSafeRoutingInfo& routing_info,
485 const std::vector<ModelSafeWorker*>& workers, 484 const std::vector<ModelSafeWorker*>& workers,
486 const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source) { 485 const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source) {
487 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 486 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
488 487
489 VLOG(1) << "SyncerThread(" << this << ")" << " ScheduleConfigImpl..."; 488 SVLOG(2) << "ScheduleConfigImpl...";
490 // TODO(tim): config-specific GetUpdatesCallerInfo value? 489 // TODO(tim): config-specific GetUpdatesCallerInfo value?
491 SyncSession* session = new SyncSession(session_context_.get(), this, 490 SyncSession* session = new SyncSession(session_context_.get(), this,
492 SyncSourceInfo(source, 491 SyncSourceInfo(source,
493 syncable::ModelTypePayloadMapFromRoutingInfo( 492 syncable::ModelTypePayloadMapFromRoutingInfo(
494 routing_info, std::string())), 493 routing_info, std::string())),
495 routing_info, workers); 494 routing_info, workers);
496 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), 495 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0),
497 SyncSessionJob::CONFIGURATION, session, FROM_HERE); 496 SyncSessionJob::CONFIGURATION, session, FROM_HERE);
498 } 497 }
499 498
500 void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay, 499 void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay,
501 SyncSessionJob::SyncSessionJobPurpose purpose, 500 SyncSessionJob::SyncSessionJobPurpose purpose,
502 sessions::SyncSession* session, 501 sessions::SyncSession* session,
503 const tracked_objects::Location& nudge_location) { 502 const tracked_objects::Location& nudge_location) {
504 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 503 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
505 504
506 SyncSessionJob job(purpose, TimeTicks::Now() + delay, 505 SyncSessionJob job(purpose, TimeTicks::Now() + delay,
507 make_linked_ptr(session), false, nudge_location); 506 make_linked_ptr(session), false, nudge_location);
508 if (purpose == SyncSessionJob::NUDGE) { 507 if (purpose == SyncSessionJob::NUDGE) {
509 VLOG(1) << "SyncerThread(" << this << ")" << " Resetting pending_nudge in" 508 SVLOG(2) << "Resetting pending_nudge in"
510 << " ScheduleSyncSessionJob"; 509 << " ScheduleSyncSessionJob";
511 DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == session); 510 DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == session);
512 pending_nudge_.reset(new SyncSessionJob(job)); 511 pending_nudge_.reset(new SyncSessionJob(job));
513 } 512 }
514 VLOG(1) << "SyncerThread(" << this << ")" 513 SVLOG(2) << "Posting job to execute in DoSyncSessionJob. Job purpose "
515 << " Posting job to execute in DoSyncSessionJob. Job purpose " 514 << job.purpose;
516 << job.purpose;
517 MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(this, 515 MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(this,
518 &SyncerThread::DoSyncSessionJob, job), 516 &SyncerThread::DoSyncSessionJob, job),
519 delay.InMilliseconds()); 517 delay.InMilliseconds());
520 } 518 }
521 519
522 void SyncerThread::SetSyncerStepsForPurpose( 520 void SyncerThread::SetSyncerStepsForPurpose(
523 SyncSessionJob::SyncSessionJobPurpose purpose, 521 SyncSessionJob::SyncSessionJobPurpose purpose,
524 SyncerStep* start, SyncerStep* end) { 522 SyncerStep* start, SyncerStep* end) {
525 *end = SYNCER_END; 523 *end = SYNCER_END;
526 switch (purpose) { 524 switch (purpose) {
(...skipping 18 matching lines...) Expand all
545 if (!ShouldRunJob(job)) { 543 if (!ShouldRunJob(job)) {
546 LOG(WARNING) << "Not executing job at DoSyncSessionJob, purpose = " 544 LOG(WARNING) << "Not executing job at DoSyncSessionJob, purpose = "
547 << job.purpose << " source = " 545 << job.purpose << " source = "
548 << job.session->source().updates_source; 546 << job.session->source().updates_source;
549 return; 547 return;
550 } 548 }
551 549
552 if (job.purpose == SyncSessionJob::NUDGE) { 550 if (job.purpose == SyncSessionJob::NUDGE) {
553 if (pending_nudge_.get() == NULL || 551 if (pending_nudge_.get() == NULL ||
554 pending_nudge_->session != job.session) { 552 pending_nudge_->session != job.session) {
555 VLOG(1) << "SyncerThread(" << this << ")" << "Dropping a nudge in " 553 SVLOG(2) << "Dropping a nudge in "
556 << "DoSyncSessionJob because another nudge was scheduled"; 554 << "DoSyncSessionJob because another nudge was scheduled";
557 return; // Another nudge must have been scheduled in in the meantime. 555 return; // Another nudge must have been scheduled in in the meantime.
558 } 556 }
559 pending_nudge_.reset(); 557 pending_nudge_.reset();
560 558
561 // Create the session with the latest model safe table and use it to purge 559 // Create the session with the latest model safe table and use it to purge
562 // and update any disabled or modified entries in the job. 560 // and update any disabled or modified entries in the job.
563 scoped_ptr<SyncSession> session(CreateSyncSession(job.session->source())); 561 scoped_ptr<SyncSession> session(CreateSyncSession(job.session->source()));
564 562
565 job.session->RebaseRoutingInfoWithLatest(session.get()); 563 job.session->RebaseRoutingInfoWithLatest(session.get());
566 } 564 }
567 VLOG(1) << "SyncerThread(" << this << ")" << " DoSyncSessionJob. job purpose " 565 SVLOG(2) << "DoSyncSessionJob. job purpose " << job.purpose;
568 << job.purpose;
569 566
570 SyncerStep begin(SYNCER_BEGIN); 567 SyncerStep begin(SYNCER_BEGIN);
571 SyncerStep end(SYNCER_END); 568 SyncerStep end(SYNCER_END);
572 SetSyncerStepsForPurpose(job.purpose, &begin, &end); 569 SetSyncerStepsForPurpose(job.purpose, &begin, &end);
573 570
574 bool has_more_to_sync = true; 571 bool has_more_to_sync = true;
575 while (ShouldRunJob(job) && has_more_to_sync) { 572 while (ShouldRunJob(job) && has_more_to_sync) {
576 VLOG(1) << "SyncerThread(" << this << ")" 573 SVLOG(2) << "Calling SyncShare.";
577 << " SyncerThread: Calling SyncShare.";
578 // Synchronously perform the sync session from this thread. 574 // Synchronously perform the sync session from this thread.
579 syncer_->SyncShare(job.session.get(), begin, end); 575 syncer_->SyncShare(job.session.get(), begin, end);
580 has_more_to_sync = job.session->HasMoreToSync(); 576 has_more_to_sync = job.session->HasMoreToSync();
581 if (has_more_to_sync) 577 if (has_more_to_sync)
582 job.session->ResetTransientState(); 578 job.session->ResetTransientState();
583 } 579 }
584 VLOG(1) << "SyncerThread(" << this << ")" 580 SVLOG(2) << "Done SyncShare looping.";
585 << " SyncerThread: Done SyncShare looping.";
586 FinishSyncSessionJob(job); 581 FinishSyncSessionJob(job);
587 } 582 }
588 583
589 void SyncerThread::UpdateCarryoverSessionState(const SyncSessionJob& old_job) { 584 void SyncerThread::UpdateCarryoverSessionState(const SyncSessionJob& old_job) {
590 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { 585 if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
591 // Whatever types were part of a configuration task will have had updates 586 // Whatever types were part of a configuration task will have had updates
592 // downloaded. For that reason, we make sure they get recorded in the 587 // downloaded. For that reason, we make sure they get recorded in the
593 // event that they get disabled at a later time. 588 // event that they get disabled at a later time.
594 ModelSafeRoutingInfo r(session_context_->previous_session_routing_info()); 589 ModelSafeRoutingInfo r(session_context_->previous_session_routing_info());
595 if (!r.empty()) { 590 if (!r.empty()) {
(...skipping 18 matching lines...) Expand all
614 for (iter = job.session->source().types.begin(); 609 for (iter = job.session->source().types.begin();
615 iter != job.session->source().types.end(); 610 iter != job.session->source().types.end();
616 ++iter) { 611 ++iter) {
617 syncable::PostTimeToTypeHistogram(iter->first, 612 syncable::PostTimeToTypeHistogram(iter->first,
618 now - last_sync_session_end_time_); 613 now - last_sync_session_end_time_);
619 } 614 }
620 } 615 }
621 last_sync_session_end_time_ = now; 616 last_sync_session_end_time_ = now;
622 UpdateCarryoverSessionState(job); 617 UpdateCarryoverSessionState(job);
623 if (IsSyncingCurrentlySilenced()) { 618 if (IsSyncingCurrentlySilenced()) {
624 VLOG(1) << "SyncerThread(" << this << ")" 619 SVLOG(2) << " We are currently throttled. "
Nicolas Zea 2011/06/07 19:55:24 " We..." -> "We..."
akalin 2011/06/07 20:17:38 Done.
625 << " We are currently throttled. So not scheduling the next sync."; 620 << "So not scheduling the next sync.";
626 SaveJob(job); 621 SaveJob(job);
627 return; // Nothing to do. 622 return; // Nothing to do.
628 } 623 }
629 624
630 VLOG(1) << "SyncerThread(" << this << ")" 625 SVLOG(2) << " Updating the next polling time after SyncMain";
Nicolas Zea 2011/06/07 19:55:24 " Updating... -> "Updating"
akalin 2011/06/07 20:17:38 Done.
631 << " Updating the next polling time after SyncMain";
632 ScheduleNextSync(job); 626 ScheduleNextSync(job);
633 } 627 }
634 628
635 void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) { 629 void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) {
636 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 630 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
637 DCHECK(!old_job.session->HasMoreToSync()); 631 DCHECK(!old_job.session->HasMoreToSync());
638 // Note: |num_server_changes_remaining| > 0 here implies that we received a 632 // Note: |num_server_changes_remaining| > 0 here implies that we received a
639 // broken response while trying to download all updates, because the Syncer 633 // broken response while trying to download all updates, because the Syncer
640 // will loop until this value is exhausted. Also, if unsynced_handles exist 634 // will loop until this value is exhausted. Also, if unsynced_handles exist
641 // but HasMoreToSync is false, this implies that the Syncer determined no 635 // but HasMoreToSync is false, this implies that the Syncer determined no
642 // forward progress was possible at this time (an error, such as an HTTP 636 // forward progress was possible at this time (an error, such as an HTTP
643 // 500, is likely to have occurred during commit). 637 // 500, is likely to have occurred during commit).
638 int num_server_changes_remaining =
639 old_job.session->status_controller()->num_server_changes_remaining();
640 size_t num_unsynced_handles =
641 old_job.session->status_controller()->unsynced_handles().size();
644 const bool work_to_do = 642 const bool work_to_do =
645 old_job.session->status_controller()->num_server_changes_remaining() > 0 643 num_server_changes_remaining > 0 || num_unsynced_handles > 0;
646 || old_job.session->status_controller()->unsynced_handles().size() > 0; 644 SVLOG(2) << "num server changes remaining: " << num_server_changes_remaining
647 VLOG(1) << "SyncerThread(" << this << ")" << " syncer has work to do: " 645 << ", num unsynced handles: " << num_unsynced_handles
648 << work_to_do; 646 << ", syncer has work to do: " << work_to_do;
649 647
650 AdjustPolling(&old_job); 648 AdjustPolling(&old_job);
651 649
652 // TODO(tim): Old impl had special code if notifications disabled. Needed? 650 // TODO(tim): Old impl had special code if notifications disabled. Needed?
653 if (!work_to_do) { 651 if (!work_to_do) {
654 // Success implies backoff relief. Note that if this was a "one-off" job 652 // Success implies backoff relief. Note that if this was a "one-off" job
655 // (i.e. purpose == SyncSessionJob::CLEAR_USER_DATA), if there was 653 // (i.e. purpose == SyncSessionJob::CLEAR_USER_DATA), if there was
656 // work_to_do before it ran this wont have changed, as jobs like this don't 654 // work_to_do before it ran this wont have changed, as jobs like this don't
657 // run a full sync cycle. So we don't need special code here. 655 // run a full sync cycle. So we don't need special code here.
658 wait_interval_.reset(); 656 wait_interval_.reset();
659 VLOG(1) << "SyncerThread(" << this << ")" 657 SVLOG(2) << " Job suceeded so not scheduling more jobs";
660 << " Job suceeded so not scheduling more jobs";
661 return; 658 return;
662 } 659 }
663 660
664 if (old_job.session->source().updates_source == 661 if (old_job.session->source().updates_source ==
665 GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION) { 662 GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION) {
666 VLOG(1) << "SyncerThread(" << this << ")" 663 SVLOG(2) << "Job failed with source continuation";
667 << " Job failed with source continuation";
668 // We don't seem to have made forward progress. Start or extend backoff. 664 // We don't seem to have made forward progress. Start or extend backoff.
669 HandleConsecutiveContinuationError(old_job); 665 HandleConsecutiveContinuationError(old_job);
670 } else if (IsBackingOff()) { 666 } else if (IsBackingOff()) {
671 VLOG(1) << "SyncerThread(" << this << ")" 667 SVLOG(2) << "A nudge during backoff failed";
672 << " A nudge during backoff failed";
673 // We weren't continuing but we're in backoff; must have been a nudge. 668 // We weren't continuing but we're in backoff; must have been a nudge.
674 DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose); 669 DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose);
675 DCHECK(!wait_interval_->had_nudge); 670 DCHECK(!wait_interval_->had_nudge);
676 wait_interval_->had_nudge = true; 671 wait_interval_->had_nudge = true;
677 wait_interval_->timer.Reset(); 672 wait_interval_->timer.Reset();
678 } else { 673 } else {
679 VLOG(1) << "SyncerThread(" << this << ")" 674 SVLOG(2) << "Failed. Schedule a job with continuation as source";
680 << " Failed. Schedule a job with continuation as source";
681 // We weren't continuing and we aren't in backoff. Schedule a normal 675 // We weren't continuing and we aren't in backoff. Schedule a normal
682 // continuation. 676 // continuation.
683 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { 677 if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
684 ScheduleConfigImpl(old_job.session->routing_info(), 678 ScheduleConfigImpl(old_job.session->routing_info(),
685 old_job.session->workers(), 679 old_job.session->workers(),
686 GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION)); 680 GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION));
687 } else { 681 } else {
688 // For all other purposes(nudge and poll) we schedule a retry nudge. 682 // For all other purposes(nudge and poll) we schedule a retry nudge.
689 ScheduleNudgeImpl(TimeDelta::FromSeconds(0), 683 ScheduleNudgeImpl(TimeDelta::FromSeconds(0),
690 GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION), 684 GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION),
(...skipping 27 matching lines...) Expand all
718 const SyncSessionJob& old_job) { 712 const SyncSessionJob& old_job) {
719 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 713 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
720 // This if conditions should be compiled out in retail builds. 714 // This if conditions should be compiled out in retail builds.
721 if (IsBackingOff()) { 715 if (IsBackingOff()) {
722 DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job); 716 DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job);
723 } 717 }
724 718
725 TimeDelta length = delay_provider_->GetDelay( 719 TimeDelta length = delay_provider_->GetDelay(
726 IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1)); 720 IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1));
727 721
728 VLOG(1) << "SyncerThread(" << this << ")" 722 SVLOG(2) << "In handle continuation error. Old job purpose is "
729 << " In handle continuation error. Old job purpose is " 723 << old_job.purpose << " . The time delta(ms) is "
730 << old_job.purpose << " . The time delta(ms) is " 724 << length.InMilliseconds();
731 << length.InMilliseconds();
732 725
733 // This will reset the had_nudge variable as well. 726 // This will reset the had_nudge variable as well.
734 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, 727 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF,
735 length)); 728 length));
736 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { 729 if (old_job.purpose == SyncSessionJob::CONFIGURATION) {
737 SyncSession* old = old_job.session.get(); 730 SyncSession* old = old_job.session.get();
738 SyncSession* s(new SyncSession(session_context_.get(), this, 731 SyncSession* s(new SyncSession(session_context_.get(), this,
739 old->source(), old->routing_info(), old->workers())); 732 old->source(), old->routing_info(), old->workers()));
740 SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, 733 SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length,
741 make_linked_ptr(s), false, FROM_HERE); 734 make_linked_ptr(s), false, FROM_HERE);
(...skipping 27 matching lines...) Expand all
769 (rand_sign * (last_delay.InSeconds() / kBackoffRandomizationFactor)); 762 (rand_sign * (last_delay.InSeconds() / kBackoffRandomizationFactor));
770 763
771 // Cap the backoff interval. 764 // Cap the backoff interval.
772 backoff_s = std::max(static_cast<int64>(1), 765 backoff_s = std::max(static_cast<int64>(1),
773 std::min(backoff_s, kMaxBackoffSeconds)); 766 std::min(backoff_s, kMaxBackoffSeconds));
774 767
775 return TimeDelta::FromSeconds(backoff_s); 768 return TimeDelta::FromSeconds(backoff_s);
776 } 769 }
777 770
778 void SyncerThread::Stop() { 771 void SyncerThread::Stop() {
779 VLOG(1) << "SyncerThread(" << this << ")" << " stop called"; 772 SVLOG(2) << "stop called";
780 syncer_->RequestEarlyExit(); // Safe to call from any thread. 773 syncer_->RequestEarlyExit(); // Safe to call from any thread.
781 session_context_->connection_manager()->RemoveListener(this); 774 session_context_->connection_manager()->RemoveListener(this);
782 thread_.Stop(); 775 thread_.Stop();
783 } 776 }
784 777
785 void SyncerThread::DoCanaryJob() { 778 void SyncerThread::DoCanaryJob() {
786 VLOG(1) << "SyncerThread(" << this << ")" << " Do canary job"; 779 SVLOG(2) << "Do canary job";
787 DoPendingJobIfPossible(true); 780 DoPendingJobIfPossible(true);
788 } 781 }
789 782
790 void SyncerThread::DoPendingJobIfPossible(bool is_canary_job) { 783 void SyncerThread::DoPendingJobIfPossible(bool is_canary_job) {
791 SyncSessionJob* job_to_execute = NULL; 784 SyncSessionJob* job_to_execute = NULL;
792 if (mode_ == CONFIGURATION_MODE && wait_interval_.get() 785 if (mode_ == CONFIGURATION_MODE && wait_interval_.get()
793 && wait_interval_->pending_configure_job.get()) { 786 && wait_interval_->pending_configure_job.get()) {
794 VLOG(1) << "SyncerThread(" << this << ")" << " Found pending configure job"; 787 SVLOG(2) << "Found pending configure job";
795 job_to_execute = wait_interval_->pending_configure_job.get(); 788 job_to_execute = wait_interval_->pending_configure_job.get();
796 } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) { 789 } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) {
797 VLOG(1) << "SyncerThread(" << this << ")" << " Found pending nudge job"; 790 SVLOG(2) << "Found pending nudge job";
798 // Pending jobs mostly have time from the past. Reset it so this job 791 // Pending jobs mostly have time from the past. Reset it so this job
799 // will get executed. 792 // will get executed.
800 if (pending_nudge_->scheduled_start < TimeTicks::Now()) 793 if (pending_nudge_->scheduled_start < TimeTicks::Now())
801 pending_nudge_->scheduled_start = TimeTicks::Now(); 794 pending_nudge_->scheduled_start = TimeTicks::Now();
802 795
803 scoped_ptr<SyncSession> session(CreateSyncSession( 796 scoped_ptr<SyncSession> session(CreateSyncSession(
804 pending_nudge_->session->source())); 797 pending_nudge_->session->source()));
805 798
806 // Also the routing info might have been changed since we cached the 799 // Also the routing info might have been changed since we cached the
807 // pending nudge. Update it by coalescing to the latest. 800 // pending nudge. Update it by coalescing to the latest.
808 pending_nudge_->session->Coalesce(*(session.get())); 801 pending_nudge_->session->Coalesce(*(session.get()));
809 // The pending nudge would be cleared in the DoSyncSessionJob function. 802 // The pending nudge would be cleared in the DoSyncSessionJob function.
810 job_to_execute = pending_nudge_.get(); 803 job_to_execute = pending_nudge_.get();
811 } 804 }
812 805
813 if (job_to_execute != NULL) { 806 if (job_to_execute != NULL) {
814 VLOG(1) << "SyncerThread(" << this << ")" << " Executing pending job"; 807 SVLOG(2) << "Executing pending job";
815 SyncSessionJob copy = *job_to_execute; 808 SyncSessionJob copy = *job_to_execute;
816 copy.is_canary_job = is_canary_job; 809 copy.is_canary_job = is_canary_job;
817 DoSyncSessionJob(copy); 810 DoSyncSessionJob(copy);
818 } 811 }
819 } 812 }
820 813
821 SyncSession* SyncerThread::CreateSyncSession(const SyncSourceInfo& source) { 814 SyncSession* SyncerThread::CreateSyncSession(const SyncSourceInfo& source) {
822 ModelSafeRoutingInfo routes; 815 ModelSafeRoutingInfo routes;
823 std::vector<ModelSafeWorker*> workers; 816 std::vector<ModelSafeWorker*> workers;
824 session_context_->registrar()->GetModelSafeRoutingInfo(&routes); 817 session_context_->registrar()->GetModelSafeRoutingInfo(&routes);
(...skipping 12 matching lines...) Expand all
837 ModelTypePayloadMap types_with_payloads = 830 ModelTypePayloadMap types_with_payloads =
838 syncable::ModelTypePayloadMapFromRoutingInfo(r, std::string()); 831 syncable::ModelTypePayloadMapFromRoutingInfo(r, std::string());
839 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads); 832 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads);
840 SyncSession* s = CreateSyncSession(info); 833 SyncSession* s = CreateSyncSession(info);
841 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s, 834 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s,
842 FROM_HERE); 835 FROM_HERE);
843 } 836 }
844 837
845 void SyncerThread::Unthrottle() { 838 void SyncerThread::Unthrottle() {
846 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); 839 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode);
847 VLOG(1) << "SyncerThread(" << this << ")" << " Unthrottled.."; 840 SVLOG(2) << "Unthrottled.";
848 DoCanaryJob(); 841 DoCanaryJob();
849 wait_interval_.reset(); 842 wait_interval_.reset();
850 } 843 }
851 844
852 void SyncerThread::Notify(SyncEngineEvent::EventCause cause) { 845 void SyncerThread::Notify(SyncEngineEvent::EventCause cause) {
853 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 846 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
854 session_context_->NotifyListeners(SyncEngineEvent(cause)); 847 session_context_->NotifyListeners(SyncEngineEvent(cause));
855 } 848 }
856 849
857 bool SyncerThread::IsBackingOff() const { 850 bool SyncerThread::IsBackingOff() const {
(...skipping 19 matching lines...) Expand all
877 syncer_short_poll_interval_seconds_ = new_interval; 870 syncer_short_poll_interval_seconds_ = new_interval;
878 } 871 }
879 872
880 void SyncerThread::OnReceivedLongPollIntervalUpdate( 873 void SyncerThread::OnReceivedLongPollIntervalUpdate(
881 const base::TimeDelta& new_interval) { 874 const base::TimeDelta& new_interval) {
882 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 875 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
883 syncer_long_poll_interval_seconds_ = new_interval; 876 syncer_long_poll_interval_seconds_ = new_interval;
884 } 877 }
885 878
886 void SyncerThread::OnShouldStopSyncingPermanently() { 879 void SyncerThread::OnShouldStopSyncingPermanently() {
887 VLOG(1) << "SyncerThread(" << this << ")" 880 SVLOG(2) << "OnShouldStopSyncingPermanently";
888 << " OnShouldStopSyncingPermanently";
889 syncer_->RequestEarlyExit(); // Thread-safe. 881 syncer_->RequestEarlyExit(); // Thread-safe.
890 Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); 882 Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY);
891 } 883 }
892 884
893 void SyncerThread::OnServerConnectionEvent( 885 void SyncerThread::OnServerConnectionEvent(
894 const ServerConnectionEvent& event) { 886 const ServerConnectionEvent& event) {
895 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this, 887 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this,
896 &SyncerThread::CheckServerConnectionManagerStatus, 888 &SyncerThread::CheckServerConnectionManagerStatus,
897 event.connection_code)); 889 event.connection_code));
898 } 890 }
899 891
900 void SyncerThread::set_notifications_enabled(bool notifications_enabled) { 892 void SyncerThread::set_notifications_enabled(bool notifications_enabled) {
901 session_context_->set_notifications_enabled(notifications_enabled); 893 session_context_->set_notifications_enabled(notifications_enabled);
902 } 894 }
903 895
896 #undef SVLOG
897
904 } // browser_sync 898 } // browser_sync
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698