OLD | NEW |
---|---|
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "chrome/browser/sync/engine/syncer_thread.h" | 5 #include "chrome/browser/sync/engine/syncer_thread.h" |
6 | 6 |
7 #include <algorithm> | 7 #include <algorithm> |
8 | 8 |
9 #include "base/rand_util.h" | 9 #include "base/rand_util.h" |
10 #include "chrome/browser/sync/engine/syncer.h" | 10 #include "chrome/browser/sync/engine/syncer.h" |
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
72 default: | 72 default: |
73 NOTREACHED(); | 73 NOTREACHED(); |
74 } | 74 } |
75 | 75 |
76 return GetUpdatesCallerInfo::UNKNOWN; | 76 return GetUpdatesCallerInfo::UNKNOWN; |
77 } | 77 } |
78 | 78 |
79 SyncerThread::WaitInterval::WaitInterval(Mode mode, TimeDelta length) | 79 SyncerThread::WaitInterval::WaitInterval(Mode mode, TimeDelta length) |
80 : mode(mode), had_nudge(false), length(length) { } | 80 : mode(mode), had_nudge(false), length(length) { } |
81 | 81 |
82 SyncerThread::SyncerThread(sessions::SyncSessionContext* context, | 82 // Helper macro to log with the syncer thread name; useful when there |
83 // are multiple syncer threads involved. | |
84 #define SVLOG(verbose_level) VLOG(verbose_level) << name_ << ": " | |
85 | |
86 SyncerThread::SyncerThread(const std::string& name, | |
87 sessions::SyncSessionContext* context, | |
83 Syncer* syncer) | 88 Syncer* syncer) |
84 : thread_("SyncEngine_SyncerThread"), | 89 : name_(name), |
90 thread_("SyncEngine_SyncerThread"), | |
85 syncer_short_poll_interval_seconds_( | 91 syncer_short_poll_interval_seconds_( |
86 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), | 92 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), |
87 syncer_long_poll_interval_seconds_( | 93 syncer_long_poll_interval_seconds_( |
88 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), | 94 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), |
89 mode_(NORMAL_MODE), | 95 mode_(NORMAL_MODE), |
90 server_connection_ok_(false), | 96 server_connection_ok_(false), |
91 delay_provider_(new DelayProvider()), | 97 delay_provider_(new DelayProvider()), |
92 syncer_(syncer), | 98 syncer_(syncer), |
93 session_context_(context) { | 99 session_context_(context) { |
94 } | 100 } |
95 | 101 |
96 SyncerThread::~SyncerThread() { | 102 SyncerThread::~SyncerThread() { |
97 DCHECK(!thread_.IsRunning()); | 103 DCHECK(!thread_.IsRunning()); |
98 } | 104 } |
99 | 105 |
100 void SyncerThread::CheckServerConnectionManagerStatus( | 106 void SyncerThread::CheckServerConnectionManagerStatus( |
101 HttpResponse::ServerConnectionCode code) { | 107 HttpResponse::ServerConnectionCode code) { |
108 bool old_server_connection_ok = server_connection_ok_; | |
102 | 109 |
103 VLOG(1) << "SyncerThread(" << this << ")" << " Server connection changed." | |
104 << "Old mode: " << server_connection_ok_ << " Code: " << code; | |
105 // Note, be careful when adding cases here because if the SyncerThread | 110 // Note, be careful when adding cases here because if the SyncerThread |
106 // thinks there is no valid connection as determined by this method, it | 111 // thinks there is no valid connection as determined by this method, it |
107 // will drop out of *all* forward progress sync loops (it won't poll and it | 112 // will drop out of *all* forward progress sync loops (it won't poll and it |
108 // will queue up Talk notifications but not actually call SyncShare) until | 113 // will queue up Talk notifications but not actually call SyncShare) until |
109 // some external action causes a ServerConnectionManager to broadcast that | 114 // some external action causes a ServerConnectionManager to broadcast that |
110 // a valid connection has been re-established. | 115 // a valid connection has been re-established. |
111 if (HttpResponse::CONNECTION_UNAVAILABLE == code || | 116 if (HttpResponse::CONNECTION_UNAVAILABLE == code || |
112 HttpResponse::SYNC_AUTH_ERROR == code) { | 117 HttpResponse::SYNC_AUTH_ERROR == code) { |
113 server_connection_ok_ = false; | 118 server_connection_ok_ = false; |
114 VLOG(1) << "SyncerThread(" << this << ")" << " Server connection changed." | |
115 << " new mode:" << server_connection_ok_; | |
116 } else if (HttpResponse::SERVER_CONNECTION_OK == code) { | 119 } else if (HttpResponse::SERVER_CONNECTION_OK == code) { |
117 server_connection_ok_ = true; | 120 server_connection_ok_ = true; |
118 VLOG(1) << "SyncerThread(" << this << ")" << " Server connection changed." | |
119 << " new mode:" << server_connection_ok_; | |
120 DoCanaryJob(); | 121 DoCanaryJob(); |
121 } | 122 } |
123 | |
124 if (old_server_connection_ok != server_connection_ok_) { | |
125 SVLOG(2) << "Server connection changed. Old mode: " | |
126 << old_server_connection_ok << ", new mode: " | |
127 << server_connection_ok_ << ", code: " << code; | |
128 } | |
122 } | 129 } |
123 | 130 |
124 void SyncerThread::Start(Mode mode, ModeChangeCallback* callback) { | 131 void SyncerThread::Start(Mode mode, ModeChangeCallback* callback) { |
125 VLOG(1) << "SyncerThread(" << this << ")" << " Start called from thread " | 132 SVLOG(2) << "Start called from thread " |
126 << MessageLoop::current()->thread_name(); | 133 << MessageLoop::current()->thread_name() << " with mode " |
134 << mode; | |
127 if (!thread_.IsRunning()) { | 135 if (!thread_.IsRunning()) { |
128 VLOG(1) << "SyncerThread(" << this << ")" << " Starting thread with mode " | 136 SVLOG(2) << "Starting thread with mode " << mode; |
129 << mode; | |
130 if (!thread_.Start()) { | 137 if (!thread_.Start()) { |
131 NOTREACHED() << "Unable to start SyncerThread."; | 138 NOTREACHED() << "Unable to start SyncerThread."; |
132 return; | 139 return; |
133 } | 140 } |
134 WatchConnectionManager(); | 141 WatchConnectionManager(); |
135 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( | 142 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
136 this, &SyncerThread::SendInitialSnapshot)); | 143 this, &SyncerThread::SendInitialSnapshot)); |
137 } | 144 } |
138 | 145 |
139 VLOG(1) << "SyncerThread(" << this << ")" << " Entering start with mode = " | |
140 << mode; | |
141 | |
142 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( | 146 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
143 this, &SyncerThread::StartImpl, mode, callback)); | 147 this, &SyncerThread::StartImpl, mode, callback)); |
144 } | 148 } |
145 | 149 |
146 void SyncerThread::SendInitialSnapshot() { | 150 void SyncerThread::SendInitialSnapshot() { |
147 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | 151 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
148 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_.get(), this, | 152 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_.get(), this, |
149 SyncSourceInfo(), ModelSafeRoutingInfo(), | 153 SyncSourceInfo(), ModelSafeRoutingInfo(), |
150 std::vector<ModelSafeWorker*>())); | 154 std::vector<ModelSafeWorker*>())); |
151 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); | 155 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); |
152 sessions::SyncSessionSnapshot snapshot(dummy->TakeSnapshot()); | 156 sessions::SyncSessionSnapshot snapshot(dummy->TakeSnapshot()); |
153 event.snapshot = &snapshot; | 157 event.snapshot = &snapshot; |
154 session_context_->NotifyListeners(event); | 158 session_context_->NotifyListeners(event); |
155 } | 159 } |
156 | 160 |
157 void SyncerThread::WatchConnectionManager() { | 161 void SyncerThread::WatchConnectionManager() { |
158 ServerConnectionManager* scm = session_context_->connection_manager(); | 162 ServerConnectionManager* scm = session_context_->connection_manager(); |
159 CheckServerConnectionManagerStatus(scm->server_status()); | 163 CheckServerConnectionManagerStatus(scm->server_status()); |
160 scm->AddListener(this); | 164 scm->AddListener(this); |
161 } | 165 } |
162 | 166 |
163 void SyncerThread::StartImpl(Mode mode, ModeChangeCallback* callback) { | 167 void SyncerThread::StartImpl(Mode mode, ModeChangeCallback* callback) { |
164 VLOG(1) << "SyncerThread(" << this << ")" << " Doing StartImpl with mode " | 168 SVLOG(2) << "Doing StartImpl with mode " << mode; |
165 << mode; | |
166 | 169 |
167 // TODO(lipalani): This will leak if startimpl is never run. Fix it using a | 170 // TODO(lipalani): This will leak if startimpl is never run. Fix it using a |
168 // ThreadSafeRefcounted object. | 171 // ThreadSafeRefcounted object. |
169 scoped_ptr<ModeChangeCallback> scoped_callback(callback); | 172 scoped_ptr<ModeChangeCallback> scoped_callback(callback); |
170 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | 173 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
171 DCHECK(!session_context_->account_name().empty()); | 174 DCHECK(!session_context_->account_name().empty()); |
172 DCHECK(syncer_.get()); | 175 DCHECK(syncer_.get()); |
173 mode_ = mode; | 176 mode_ = mode; |
174 AdjustPolling(NULL); // Will kick start poll timer if needed. | 177 AdjustPolling(NULL); // Will kick start poll timer if needed. |
175 if (scoped_callback.get()) | 178 if (scoped_callback.get()) |
176 scoped_callback->Run(); | 179 scoped_callback->Run(); |
177 | 180 |
178 // We just changed our mode. See if there are any pending jobs that we could | 181 // We just changed our mode. See if there are any pending jobs that we could |
179 // execute in the new mode. | 182 // execute in the new mode. |
180 DoPendingJobIfPossible(false); | 183 DoPendingJobIfPossible(false); |
181 } | 184 } |
182 | 185 |
183 SyncerThread::JobProcessDecision SyncerThread::DecideWhileInWaitInterval( | 186 SyncerThread::JobProcessDecision SyncerThread::DecideWhileInWaitInterval( |
184 const SyncSessionJob& job) { | 187 const SyncSessionJob& job) { |
185 | 188 |
186 DCHECK(wait_interval_.get()); | 189 DCHECK(wait_interval_.get()); |
187 DCHECK_NE(job.purpose, SyncSessionJob::CLEAR_USER_DATA); | 190 DCHECK_NE(job.purpose, SyncSessionJob::CLEAR_USER_DATA); |
188 | 191 |
189 VLOG(1) << "SyncerThread(" << this << ")" << " Wait interval mode : " | 192 SVLOG(2) << "Wait interval mode : " |
190 << wait_interval_->mode << "Wait interval had nudge : " | 193 << wait_interval_->mode << "Wait interval had nudge : " |
Nicolas Zea
2011/06/07 19:55:24
"Wait interval... -> ", wait interval...
"is canar
akalin
2011/06/07 20:17:38
Done.
| |
191 << wait_interval_->had_nudge << "is canary job : " | 194 << wait_interval_->had_nudge << "is canary job : " |
192 << job.is_canary_job; | 195 << job.is_canary_job; |
193 | 196 |
194 if (job.purpose == SyncSessionJob::POLL) | 197 if (job.purpose == SyncSessionJob::POLL) |
195 return DROP; | 198 return DROP; |
196 | 199 |
197 DCHECK(job.purpose == SyncSessionJob::NUDGE || | 200 DCHECK(job.purpose == SyncSessionJob::NUDGE || |
198 job.purpose == SyncSessionJob::CONFIGURATION); | 201 job.purpose == SyncSessionJob::CONFIGURATION); |
199 if (wait_interval_->mode == WaitInterval::THROTTLED) | 202 if (wait_interval_->mode == WaitInterval::THROTTLED) |
200 return SAVE; | 203 return SAVE; |
201 | 204 |
202 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); | 205 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); |
(...skipping 25 matching lines...) Expand all Loading... | |
228 else | 231 else |
229 return DROP; | 232 return DROP; |
230 } | 233 } |
231 | 234 |
232 // We are in normal mode. | 235 // We are in normal mode. |
233 DCHECK_EQ(mode_, NORMAL_MODE); | 236 DCHECK_EQ(mode_, NORMAL_MODE); |
234 DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION); | 237 DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION); |
235 | 238 |
236 // Freshness condition | 239 // Freshness condition |
237 if (job.scheduled_start < last_sync_session_end_time_) { | 240 if (job.scheduled_start < last_sync_session_end_time_) { |
238 VLOG(1) << "SyncerThread(" << this << ")" | 241 SVLOG(2) << "Dropping job because of freshness"; |
239 << " Dropping job because of freshness"; | |
240 return DROP; | 242 return DROP; |
241 } | 243 } |
242 | 244 |
243 if (server_connection_ok_) | 245 if (server_connection_ok_) |
244 return CONTINUE; | 246 return CONTINUE; |
245 | 247 |
246 VLOG(1) << "SyncerThread(" << this << ")" | 248 SVLOG(2) << "Bad server connection. Using that to decide on job."; |
247 << " Bad server connection. Using that to decide on job."; | |
248 return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP; | 249 return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP; |
249 } | 250 } |
250 | 251 |
251 void SyncerThread::InitOrCoalescePendingJob(const SyncSessionJob& job) { | 252 void SyncerThread::InitOrCoalescePendingJob(const SyncSessionJob& job) { |
252 DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); | 253 DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); |
253 if (pending_nudge_.get() == NULL) { | 254 if (pending_nudge_.get() == NULL) { |
254 VLOG(1) << "SyncerThread(" << this << ")" | 255 SVLOG(2) << "Creating a pending nudge job"; |
255 << " Creating a pending nudge job"; | |
256 SyncSession* s = job.session.get(); | 256 SyncSession* s = job.session.get(); |
257 scoped_ptr<SyncSession> session(new SyncSession(s->context(), | 257 scoped_ptr<SyncSession> session(new SyncSession(s->context(), |
258 s->delegate(), s->source(), s->routing_info(), s->workers())); | 258 s->delegate(), s->source(), s->routing_info(), s->workers())); |
259 | 259 |
260 SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, | 260 SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, |
261 make_linked_ptr(session.release()), false, job.nudge_location); | 261 make_linked_ptr(session.release()), false, job.nudge_location); |
262 pending_nudge_.reset(new SyncSessionJob(new_job)); | 262 pending_nudge_.reset(new SyncSessionJob(new_job)); |
263 | 263 |
264 return; | 264 return; |
265 } | 265 } |
266 | 266 |
267 VLOG(1) << "SyncerThread(" << this << ")" << " Coalescing a pending nudge"; | 267 SVLOG(2) << "Coalescing a pending nudge"; |
268 pending_nudge_->session->Coalesce(*(job.session.get())); | 268 pending_nudge_->session->Coalesce(*(job.session.get())); |
269 pending_nudge_->scheduled_start = job.scheduled_start; | 269 pending_nudge_->scheduled_start = job.scheduled_start; |
270 | 270 |
271 // Unfortunately the nudge location cannot be modified. So it stores the | 271 // Unfortunately the nudge location cannot be modified. So it stores the |
272 // location of the first caller. | 272 // location of the first caller. |
273 } | 273 } |
274 | 274 |
275 bool SyncerThread::ShouldRunJob(const SyncSessionJob& job) { | 275 bool SyncerThread::ShouldRunJob(const SyncSessionJob& job) { |
276 JobProcessDecision decision = DecideOnJob(job); | 276 JobProcessDecision decision = DecideOnJob(job); |
277 VLOG(1) << "SyncerThread(" << this << ")" << " Should run job, decision: " | 277 SVLOG(2) << "Should run job, decision: " |
278 << decision << " Job purpose " << job.purpose << "mode " << mode_; | 278 << decision << " Job purpose " << job.purpose |
Nicolas Zea
2011/06/07 19:55:24
", Job purpose "
akalin
2011/06/07 20:17:38
Done.
| |
279 << ", mode " << mode_; | |
279 if (decision != SAVE) | 280 if (decision != SAVE) |
280 return decision == CONTINUE; | 281 return decision == CONTINUE; |
281 | 282 |
282 DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose == | 283 DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose == |
283 SyncSessionJob::CONFIGURATION); | 284 SyncSessionJob::CONFIGURATION); |
284 | 285 |
285 SaveJob(job); | 286 SaveJob(job); |
286 return false; | 287 return false; |
287 } | 288 } |
288 | 289 |
289 void SyncerThread::SaveJob(const SyncSessionJob& job) { | 290 void SyncerThread::SaveJob(const SyncSessionJob& job) { |
290 DCHECK(job.purpose != SyncSessionJob::CLEAR_USER_DATA); | 291 DCHECK(job.purpose != SyncSessionJob::CLEAR_USER_DATA); |
291 if (job.purpose == SyncSessionJob::NUDGE) { | 292 if (job.purpose == SyncSessionJob::NUDGE) { |
292 VLOG(1) << "SyncerThread(" << this << ")" << " Saving a nudge job"; | 293 SVLOG(2) << "Saving a nudge job"; |
293 InitOrCoalescePendingJob(job); | 294 InitOrCoalescePendingJob(job); |
294 } else if (job.purpose == SyncSessionJob::CONFIGURATION){ | 295 } else if (job.purpose == SyncSessionJob::CONFIGURATION){ |
295 VLOG(1) << "SyncerThread(" << this << ")" << " Saving a configuration job"; | 296 SVLOG(2) << "Saving a configuration job"; |
296 DCHECK(wait_interval_.get()); | 297 DCHECK(wait_interval_.get()); |
297 DCHECK(mode_ == CONFIGURATION_MODE); | 298 DCHECK(mode_ == CONFIGURATION_MODE); |
298 | 299 |
299 SyncSession* old = job.session.get(); | 300 SyncSession* old = job.session.get(); |
300 SyncSession* s(new SyncSession(session_context_.get(), this, | 301 SyncSession* s(new SyncSession(session_context_.get(), this, |
301 old->source(), old->routing_info(), old->workers())); | 302 old->source(), old->routing_info(), old->workers())); |
302 SyncSessionJob new_job(job.purpose, TimeTicks::Now(), | 303 SyncSessionJob new_job(job.purpose, TimeTicks::Now(), |
303 make_linked_ptr(s), false, job.nudge_location); | 304 make_linked_ptr(s), false, job.nudge_location); |
304 wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); | 305 wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); |
305 } // drop the rest. | 306 } // drop the rest. |
(...skipping 14 matching lines...) Expand all Loading... | |
320 return; | 321 return; |
321 } | 322 } |
322 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( | 323 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
323 this, &SyncerThread::ScheduleClearUserDataImpl)); | 324 this, &SyncerThread::ScheduleClearUserDataImpl)); |
324 } | 325 } |
325 | 326 |
326 void SyncerThread::ScheduleNudge(const TimeDelta& delay, | 327 void SyncerThread::ScheduleNudge(const TimeDelta& delay, |
327 NudgeSource source, const ModelTypeBitSet& types, | 328 NudgeSource source, const ModelTypeBitSet& types, |
328 const tracked_objects::Location& nudge_location) { | 329 const tracked_objects::Location& nudge_location) { |
329 if (!thread_.IsRunning()) { | 330 if (!thread_.IsRunning()) { |
330 VLOG(0) << "Dropping nudge because thread is not running."; | 331 LOG(INFO) << "Dropping nudge because thread is not running."; |
331 NOTREACHED(); | 332 NOTREACHED(); |
332 return; | 333 return; |
333 } | 334 } |
334 | 335 |
335 VLOG(1) << "SyncerThread(" << this << ")" << " Nudge scheduled"; | 336 SVLOG(2) << "Nudge scheduled (source=" << source << ")"; |
336 | 337 |
337 ModelTypePayloadMap types_with_payloads = | 338 ModelTypePayloadMap types_with_payloads = |
338 syncable::ModelTypePayloadMapFromBitSet(types, std::string()); | 339 syncable::ModelTypePayloadMapFromBitSet(types, std::string()); |
339 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( | 340 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
340 this, &SyncerThread::ScheduleNudgeImpl, delay, | 341 this, &SyncerThread::ScheduleNudgeImpl, delay, |
341 GetUpdatesFromNudgeSource(source), types_with_payloads, false, | 342 GetUpdatesFromNudgeSource(source), types_with_payloads, false, |
342 nudge_location)); | 343 nudge_location)); |
343 } | 344 } |
344 | 345 |
345 void SyncerThread::ScheduleNudgeWithPayloads(const TimeDelta& delay, | 346 void SyncerThread::ScheduleNudgeWithPayloads(const TimeDelta& delay, |
346 NudgeSource source, const ModelTypePayloadMap& types_with_payloads, | 347 NudgeSource source, const ModelTypePayloadMap& types_with_payloads, |
347 const tracked_objects::Location& nudge_location) { | 348 const tracked_objects::Location& nudge_location) { |
348 if (!thread_.IsRunning()) { | 349 if (!thread_.IsRunning()) { |
349 NOTREACHED(); | 350 NOTREACHED(); |
350 return; | 351 return; |
351 } | 352 } |
352 | 353 |
353 VLOG(1) << "SyncerThread(" << this << ")" << " Nudge scheduled with payloads"; | 354 SVLOG(2) << "Nudge scheduled with payloads (source=" << source << ")"; |
354 | 355 |
355 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( | 356 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
356 this, &SyncerThread::ScheduleNudgeImpl, delay, | 357 this, &SyncerThread::ScheduleNudgeImpl, delay, |
357 GetUpdatesFromNudgeSource(source), types_with_payloads, false, | 358 GetUpdatesFromNudgeSource(source), types_with_payloads, false, |
358 nudge_location)); | 359 nudge_location)); |
359 } | 360 } |
360 | 361 |
361 void SyncerThread::ScheduleClearUserDataImpl() { | 362 void SyncerThread::ScheduleClearUserDataImpl() { |
362 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | 363 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
363 SyncSession* session = new SyncSession(session_context_.get(), this, | 364 SyncSession* session = new SyncSession(session_context_.get(), this, |
364 SyncSourceInfo(), ModelSafeRoutingInfo(), | 365 SyncSourceInfo(), ModelSafeRoutingInfo(), |
365 std::vector<ModelSafeWorker*>()); | 366 std::vector<ModelSafeWorker*>()); |
366 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), | 367 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), |
367 SyncSessionJob::CLEAR_USER_DATA, session, FROM_HERE); | 368 SyncSessionJob::CLEAR_USER_DATA, session, FROM_HERE); |
368 } | 369 } |
369 | 370 |
370 void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay, | 371 void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay, |
371 GetUpdatesCallerInfo::GetUpdatesSource source, | 372 GetUpdatesCallerInfo::GetUpdatesSource source, |
372 const ModelTypePayloadMap& types_with_payloads, | 373 const ModelTypePayloadMap& types_with_payloads, |
373 bool is_canary_job, const tracked_objects::Location& nudge_location) { | 374 bool is_canary_job, const tracked_objects::Location& nudge_location) { |
374 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | 375 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
375 | 376 |
376 VLOG(1) << "SyncerThread(" << this << ")" << " Running Schedule nudge impl"; | 377 SVLOG(2) << "Running Schedule nudge impl (source=" << source << ")"; |
377 // Note we currently nudge for all types regardless of the ones incurring | 378 // Note we currently nudge for all types regardless of the ones incurring |
378 // the nudge. Doing different would throw off some syncer commands like | 379 // the nudge. Doing different would throw off some syncer commands like |
379 // CleanupDisabledTypes. We may want to change this in the future. | 380 // CleanupDisabledTypes. We may want to change this in the future. |
380 SyncSourceInfo info(source, types_with_payloads); | 381 SyncSourceInfo info(source, types_with_payloads); |
381 | 382 |
382 SyncSession* session(CreateSyncSession(info)); | 383 SyncSession* session(CreateSyncSession(info)); |
383 SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, | 384 SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, |
384 make_linked_ptr(session), is_canary_job, | 385 make_linked_ptr(session), is_canary_job, |
385 nudge_location); | 386 nudge_location); |
386 | 387 |
387 session = NULL; | 388 session = NULL; |
388 if (!ShouldRunJob(job)) | 389 if (!ShouldRunJob(job)) |
389 return; | 390 return; |
390 | 391 |
391 if (pending_nudge_.get()) { | 392 if (pending_nudge_.get()) { |
392 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) { | 393 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) { |
393 VLOG(1) << "SyncerThread(" << this << ")" << " Dropping the nudge because" | 394 SVLOG(2) << "Dropping the nudge because we are in backoff"; |
394 << "we are in backoff"; | |
395 return; | 395 return; |
396 } | 396 } |
397 | 397 |
398 VLOG(1) << "SyncerThread(" << this << ")" << " Coalescing pending nudge"; | 398 SVLOG(2) << "Coalescing pending nudge"; |
399 pending_nudge_->session->Coalesce(*(job.session.get())); | 399 pending_nudge_->session->Coalesce(*(job.session.get())); |
400 | 400 |
401 if (!IsBackingOff()) { | 401 if (!IsBackingOff()) { |
402 VLOG(1) << "SyncerThread(" << this << ")" << " Dropping a nudge because" | 402 SVLOG(2) << "Dropping a nudge because" |
403 << " we are not in backoff and the job was coalesced"; | 403 << " we are not in backoff and the job was coalesced"; |
404 return; | 404 return; |
405 } else { | 405 } else { |
406 VLOG(1) << "SyncerThread(" << this << ")" | 406 SVLOG(2) << "Rescheduling pending nudge"; |
407 << " Rescheduling pending nudge"; | |
408 SyncSession* s = pending_nudge_->session.get(); | 407 SyncSession* s = pending_nudge_->session.get(); |
409 job.session.reset(new SyncSession(s->context(), s->delegate(), | 408 job.session.reset(new SyncSession(s->context(), s->delegate(), |
410 s->source(), s->routing_info(), s->workers())); | 409 s->source(), s->routing_info(), s->workers())); |
411 pending_nudge_.reset(); | 410 pending_nudge_.reset(); |
412 } | 411 } |
413 } | 412 } |
414 | 413 |
415 // TODO(lipalani) - pass the job itself to ScheduleSyncSessionJob. | 414 // TODO(lipalani) - pass the job itself to ScheduleSyncSessionJob. |
416 ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, job.session.release(), | 415 ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, job.session.release(), |
417 nudge_location); | 416 nudge_location); |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
458 if (it != w_tmp.end()) | 457 if (it != w_tmp.end()) |
459 workers->push_back(*it); | 458 workers->push_back(*it); |
460 else | 459 else |
461 NOTREACHED(); | 460 NOTREACHED(); |
462 } | 461 } |
463 } | 462 } |
464 | 463 |
465 void SyncerThread::ScheduleConfig(const ModelTypeBitSet& types, | 464 void SyncerThread::ScheduleConfig(const ModelTypeBitSet& types, |
466 sync_api::ConfigureReason reason) { | 465 sync_api::ConfigureReason reason) { |
467 if (!thread_.IsRunning()) { | 466 if (!thread_.IsRunning()) { |
468 VLOG(0) << "ScheduleConfig failed because thread is not running."; | 467 LOG(INFO) << "ScheduleConfig failed because thread is not running."; |
469 NOTREACHED(); | 468 NOTREACHED(); |
470 return; | 469 return; |
471 } | 470 } |
472 | 471 |
473 VLOG(1) << "SyncerThread(" << this << ")" << " Scheduling a config"; | 472 SVLOG(2) << "Scheduling a config"; |
474 ModelSafeRoutingInfo routes; | 473 ModelSafeRoutingInfo routes; |
475 std::vector<ModelSafeWorker*> workers; | 474 std::vector<ModelSafeWorker*> workers; |
476 GetModelSafeParamsForTypes(types, session_context_->registrar(), | 475 GetModelSafeParamsForTypes(types, session_context_->registrar(), |
477 &routes, &workers); | 476 &routes, &workers); |
478 | 477 |
479 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( | 478 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
480 this, &SyncerThread::ScheduleConfigImpl, routes, workers, | 479 this, &SyncerThread::ScheduleConfigImpl, routes, workers, |
481 GetSourceFromReason(reason))); | 480 GetSourceFromReason(reason))); |
482 } | 481 } |
483 | 482 |
484 void SyncerThread::ScheduleConfigImpl(const ModelSafeRoutingInfo& routing_info, | 483 void SyncerThread::ScheduleConfigImpl(const ModelSafeRoutingInfo& routing_info, |
485 const std::vector<ModelSafeWorker*>& workers, | 484 const std::vector<ModelSafeWorker*>& workers, |
486 const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source) { | 485 const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source) { |
487 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | 486 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
488 | 487 |
489 VLOG(1) << "SyncerThread(" << this << ")" << " ScheduleConfigImpl..."; | 488 SVLOG(2) << "ScheduleConfigImpl..."; |
490 // TODO(tim): config-specific GetUpdatesCallerInfo value? | 489 // TODO(tim): config-specific GetUpdatesCallerInfo value? |
491 SyncSession* session = new SyncSession(session_context_.get(), this, | 490 SyncSession* session = new SyncSession(session_context_.get(), this, |
492 SyncSourceInfo(source, | 491 SyncSourceInfo(source, |
493 syncable::ModelTypePayloadMapFromRoutingInfo( | 492 syncable::ModelTypePayloadMapFromRoutingInfo( |
494 routing_info, std::string())), | 493 routing_info, std::string())), |
495 routing_info, workers); | 494 routing_info, workers); |
496 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), | 495 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), |
497 SyncSessionJob::CONFIGURATION, session, FROM_HERE); | 496 SyncSessionJob::CONFIGURATION, session, FROM_HERE); |
498 } | 497 } |
499 | 498 |
500 void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay, | 499 void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay, |
501 SyncSessionJob::SyncSessionJobPurpose purpose, | 500 SyncSessionJob::SyncSessionJobPurpose purpose, |
502 sessions::SyncSession* session, | 501 sessions::SyncSession* session, |
503 const tracked_objects::Location& nudge_location) { | 502 const tracked_objects::Location& nudge_location) { |
504 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | 503 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
505 | 504 |
506 SyncSessionJob job(purpose, TimeTicks::Now() + delay, | 505 SyncSessionJob job(purpose, TimeTicks::Now() + delay, |
507 make_linked_ptr(session), false, nudge_location); | 506 make_linked_ptr(session), false, nudge_location); |
508 if (purpose == SyncSessionJob::NUDGE) { | 507 if (purpose == SyncSessionJob::NUDGE) { |
509 VLOG(1) << "SyncerThread(" << this << ")" << " Resetting pending_nudge in" | 508 SVLOG(2) << "Resetting pending_nudge in" |
510 << " ScheduleSyncSessionJob"; | 509 << " ScheduleSyncSessionJob"; |
511 DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == session); | 510 DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == session); |
512 pending_nudge_.reset(new SyncSessionJob(job)); | 511 pending_nudge_.reset(new SyncSessionJob(job)); |
513 } | 512 } |
514 VLOG(1) << "SyncerThread(" << this << ")" | 513 SVLOG(2) << "Posting job to execute in DoSyncSessionJob. Job purpose " |
515 << " Posting job to execute in DoSyncSessionJob. Job purpose " | 514 << job.purpose; |
516 << job.purpose; | |
517 MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(this, | 515 MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(this, |
518 &SyncerThread::DoSyncSessionJob, job), | 516 &SyncerThread::DoSyncSessionJob, job), |
519 delay.InMilliseconds()); | 517 delay.InMilliseconds()); |
520 } | 518 } |
521 | 519 |
522 void SyncerThread::SetSyncerStepsForPurpose( | 520 void SyncerThread::SetSyncerStepsForPurpose( |
523 SyncSessionJob::SyncSessionJobPurpose purpose, | 521 SyncSessionJob::SyncSessionJobPurpose purpose, |
524 SyncerStep* start, SyncerStep* end) { | 522 SyncerStep* start, SyncerStep* end) { |
525 *end = SYNCER_END; | 523 *end = SYNCER_END; |
526 switch (purpose) { | 524 switch (purpose) { |
(...skipping 18 matching lines...) Expand all Loading... | |
545 if (!ShouldRunJob(job)) { | 543 if (!ShouldRunJob(job)) { |
546 LOG(WARNING) << "Not executing job at DoSyncSessionJob, purpose = " | 544 LOG(WARNING) << "Not executing job at DoSyncSessionJob, purpose = " |
547 << job.purpose << " source = " | 545 << job.purpose << " source = " |
548 << job.session->source().updates_source; | 546 << job.session->source().updates_source; |
549 return; | 547 return; |
550 } | 548 } |
551 | 549 |
552 if (job.purpose == SyncSessionJob::NUDGE) { | 550 if (job.purpose == SyncSessionJob::NUDGE) { |
553 if (pending_nudge_.get() == NULL || | 551 if (pending_nudge_.get() == NULL || |
554 pending_nudge_->session != job.session) { | 552 pending_nudge_->session != job.session) { |
555 VLOG(1) << "SyncerThread(" << this << ")" << "Dropping a nudge in " | 553 SVLOG(2) << "Dropping a nudge in " |
556 << "DoSyncSessionJob because another nudge was scheduled"; | 554 << "DoSyncSessionJob because another nudge was scheduled"; |
557 return; // Another nudge must have been scheduled in in the meantime. | 555 return; // Another nudge must have been scheduled in in the meantime. |
558 } | 556 } |
559 pending_nudge_.reset(); | 557 pending_nudge_.reset(); |
560 | 558 |
561 // Create the session with the latest model safe table and use it to purge | 559 // Create the session with the latest model safe table and use it to purge |
562 // and update any disabled or modified entries in the job. | 560 // and update any disabled or modified entries in the job. |
563 scoped_ptr<SyncSession> session(CreateSyncSession(job.session->source())); | 561 scoped_ptr<SyncSession> session(CreateSyncSession(job.session->source())); |
564 | 562 |
565 job.session->RebaseRoutingInfoWithLatest(session.get()); | 563 job.session->RebaseRoutingInfoWithLatest(session.get()); |
566 } | 564 } |
567 VLOG(1) << "SyncerThread(" << this << ")" << " DoSyncSessionJob. job purpose " | 565 SVLOG(2) << "DoSyncSessionJob. job purpose " << job.purpose; |
568 << job.purpose; | |
569 | 566 |
570 SyncerStep begin(SYNCER_BEGIN); | 567 SyncerStep begin(SYNCER_BEGIN); |
571 SyncerStep end(SYNCER_END); | 568 SyncerStep end(SYNCER_END); |
572 SetSyncerStepsForPurpose(job.purpose, &begin, &end); | 569 SetSyncerStepsForPurpose(job.purpose, &begin, &end); |
573 | 570 |
574 bool has_more_to_sync = true; | 571 bool has_more_to_sync = true; |
575 while (ShouldRunJob(job) && has_more_to_sync) { | 572 while (ShouldRunJob(job) && has_more_to_sync) { |
576 VLOG(1) << "SyncerThread(" << this << ")" | 573 SVLOG(2) << "Calling SyncShare."; |
577 << " SyncerThread: Calling SyncShare."; | |
578 // Synchronously perform the sync session from this thread. | 574 // Synchronously perform the sync session from this thread. |
579 syncer_->SyncShare(job.session.get(), begin, end); | 575 syncer_->SyncShare(job.session.get(), begin, end); |
580 has_more_to_sync = job.session->HasMoreToSync(); | 576 has_more_to_sync = job.session->HasMoreToSync(); |
581 if (has_more_to_sync) | 577 if (has_more_to_sync) |
582 job.session->ResetTransientState(); | 578 job.session->ResetTransientState(); |
583 } | 579 } |
584 VLOG(1) << "SyncerThread(" << this << ")" | 580 SVLOG(2) << "Done SyncShare looping."; |
585 << " SyncerThread: Done SyncShare looping."; | |
586 FinishSyncSessionJob(job); | 581 FinishSyncSessionJob(job); |
587 } | 582 } |
588 | 583 |
589 void SyncerThread::UpdateCarryoverSessionState(const SyncSessionJob& old_job) { | 584 void SyncerThread::UpdateCarryoverSessionState(const SyncSessionJob& old_job) { |
590 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { | 585 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { |
591 // Whatever types were part of a configuration task will have had updates | 586 // Whatever types were part of a configuration task will have had updates |
592 // downloaded. For that reason, we make sure they get recorded in the | 587 // downloaded. For that reason, we make sure they get recorded in the |
593 // event that they get disabled at a later time. | 588 // event that they get disabled at a later time. |
594 ModelSafeRoutingInfo r(session_context_->previous_session_routing_info()); | 589 ModelSafeRoutingInfo r(session_context_->previous_session_routing_info()); |
595 if (!r.empty()) { | 590 if (!r.empty()) { |
(...skipping 18 matching lines...) Expand all Loading... | |
614 for (iter = job.session->source().types.begin(); | 609 for (iter = job.session->source().types.begin(); |
615 iter != job.session->source().types.end(); | 610 iter != job.session->source().types.end(); |
616 ++iter) { | 611 ++iter) { |
617 syncable::PostTimeToTypeHistogram(iter->first, | 612 syncable::PostTimeToTypeHistogram(iter->first, |
618 now - last_sync_session_end_time_); | 613 now - last_sync_session_end_time_); |
619 } | 614 } |
620 } | 615 } |
621 last_sync_session_end_time_ = now; | 616 last_sync_session_end_time_ = now; |
622 UpdateCarryoverSessionState(job); | 617 UpdateCarryoverSessionState(job); |
623 if (IsSyncingCurrentlySilenced()) { | 618 if (IsSyncingCurrentlySilenced()) { |
624 VLOG(1) << "SyncerThread(" << this << ")" | 619 SVLOG(2) << " We are currently throttled. " |
Nicolas Zea
2011/06/07 19:55:24
" We..." -> "We..."
akalin
2011/06/07 20:17:38
Done.
| |
625 << " We are currently throttled. So not scheduling the next sync."; | 620 << "So not scheduling the next sync."; |
626 SaveJob(job); | 621 SaveJob(job); |
627 return; // Nothing to do. | 622 return; // Nothing to do. |
628 } | 623 } |
629 | 624 |
630 VLOG(1) << "SyncerThread(" << this << ")" | 625 SVLOG(2) << " Updating the next polling time after SyncMain"; |
Nicolas Zea
2011/06/07 19:55:24
" Updating... -> "Updating"
akalin
2011/06/07 20:17:38
Done.
| |
631 << " Updating the next polling time after SyncMain"; | |
632 ScheduleNextSync(job); | 626 ScheduleNextSync(job); |
633 } | 627 } |
634 | 628 |
635 void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) { | 629 void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) { |
636 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | 630 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
637 DCHECK(!old_job.session->HasMoreToSync()); | 631 DCHECK(!old_job.session->HasMoreToSync()); |
638 // Note: |num_server_changes_remaining| > 0 here implies that we received a | 632 // Note: |num_server_changes_remaining| > 0 here implies that we received a |
639 // broken response while trying to download all updates, because the Syncer | 633 // broken response while trying to download all updates, because the Syncer |
640 // will loop until this value is exhausted. Also, if unsynced_handles exist | 634 // will loop until this value is exhausted. Also, if unsynced_handles exist |
641 // but HasMoreToSync is false, this implies that the Syncer determined no | 635 // but HasMoreToSync is false, this implies that the Syncer determined no |
642 // forward progress was possible at this time (an error, such as an HTTP | 636 // forward progress was possible at this time (an error, such as an HTTP |
643 // 500, is likely to have occurred during commit). | 637 // 500, is likely to have occurred during commit). |
638 int num_server_changes_remaining = | |
639 old_job.session->status_controller()->num_server_changes_remaining(); | |
640 size_t num_unsynced_handles = | |
641 old_job.session->status_controller()->unsynced_handles().size(); | |
644 const bool work_to_do = | 642 const bool work_to_do = |
645 old_job.session->status_controller()->num_server_changes_remaining() > 0 | 643 num_server_changes_remaining > 0 || num_unsynced_handles > 0; |
646 || old_job.session->status_controller()->unsynced_handles().size() > 0; | 644 SVLOG(2) << "num server changes remaining: " << num_server_changes_remaining |
647 VLOG(1) << "SyncerThread(" << this << ")" << " syncer has work to do: " | 645 << ", num unsynced handles: " << num_unsynced_handles |
648 << work_to_do; | 646 << ", syncer has work to do: " << work_to_do; |
649 | 647 |
650 AdjustPolling(&old_job); | 648 AdjustPolling(&old_job); |
651 | 649 |
652 // TODO(tim): Old impl had special code if notifications disabled. Needed? | 650 // TODO(tim): Old impl had special code if notifications disabled. Needed? |
653 if (!work_to_do) { | 651 if (!work_to_do) { |
654 // Success implies backoff relief. Note that if this was a "one-off" job | 652 // Success implies backoff relief. Note that if this was a "one-off" job |
655 // (i.e. purpose == SyncSessionJob::CLEAR_USER_DATA), if there was | 653 // (i.e. purpose == SyncSessionJob::CLEAR_USER_DATA), if there was |
656 // work_to_do before it ran this wont have changed, as jobs like this don't | 654 // work_to_do before it ran this wont have changed, as jobs like this don't |
657 // run a full sync cycle. So we don't need special code here. | 655 // run a full sync cycle. So we don't need special code here. |
658 wait_interval_.reset(); | 656 wait_interval_.reset(); |
659 VLOG(1) << "SyncerThread(" << this << ")" | 657 SVLOG(2) << " Job suceeded so not scheduling more jobs"; |
660 << " Job suceeded so not scheduling more jobs"; | |
661 return; | 658 return; |
662 } | 659 } |
663 | 660 |
664 if (old_job.session->source().updates_source == | 661 if (old_job.session->source().updates_source == |
665 GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION) { | 662 GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION) { |
666 VLOG(1) << "SyncerThread(" << this << ")" | 663 SVLOG(2) << "Job failed with source continuation"; |
667 << " Job failed with source continuation"; | |
668 // We don't seem to have made forward progress. Start or extend backoff. | 664 // We don't seem to have made forward progress. Start or extend backoff. |
669 HandleConsecutiveContinuationError(old_job); | 665 HandleConsecutiveContinuationError(old_job); |
670 } else if (IsBackingOff()) { | 666 } else if (IsBackingOff()) { |
671 VLOG(1) << "SyncerThread(" << this << ")" | 667 SVLOG(2) << "A nudge during backoff failed"; |
672 << " A nudge during backoff failed"; | |
673 // We weren't continuing but we're in backoff; must have been a nudge. | 668 // We weren't continuing but we're in backoff; must have been a nudge. |
674 DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose); | 669 DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose); |
675 DCHECK(!wait_interval_->had_nudge); | 670 DCHECK(!wait_interval_->had_nudge); |
676 wait_interval_->had_nudge = true; | 671 wait_interval_->had_nudge = true; |
677 wait_interval_->timer.Reset(); | 672 wait_interval_->timer.Reset(); |
678 } else { | 673 } else { |
679 VLOG(1) << "SyncerThread(" << this << ")" | 674 SVLOG(2) << "Failed. Schedule a job with continuation as source"; |
680 << " Failed. Schedule a job with continuation as source"; | |
681 // We weren't continuing and we aren't in backoff. Schedule a normal | 675 // We weren't continuing and we aren't in backoff. Schedule a normal |
682 // continuation. | 676 // continuation. |
683 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { | 677 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { |
684 ScheduleConfigImpl(old_job.session->routing_info(), | 678 ScheduleConfigImpl(old_job.session->routing_info(), |
685 old_job.session->workers(), | 679 old_job.session->workers(), |
686 GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION)); | 680 GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION)); |
687 } else { | 681 } else { |
688 // For all other purposes(nudge and poll) we schedule a retry nudge. | 682 // For all other purposes(nudge and poll) we schedule a retry nudge. |
689 ScheduleNudgeImpl(TimeDelta::FromSeconds(0), | 683 ScheduleNudgeImpl(TimeDelta::FromSeconds(0), |
690 GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION), | 684 GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION), |
(...skipping 27 matching lines...) Expand all Loading... | |
718 const SyncSessionJob& old_job) { | 712 const SyncSessionJob& old_job) { |
719 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | 713 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
720 // This if conditions should be compiled out in retail builds. | 714 // This if conditions should be compiled out in retail builds. |
721 if (IsBackingOff()) { | 715 if (IsBackingOff()) { |
722 DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job); | 716 DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job); |
723 } | 717 } |
724 | 718 |
725 TimeDelta length = delay_provider_->GetDelay( | 719 TimeDelta length = delay_provider_->GetDelay( |
726 IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1)); | 720 IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1)); |
727 | 721 |
728 VLOG(1) << "SyncerThread(" << this << ")" | 722 SVLOG(2) << "In handle continuation error. Old job purpose is " |
729 << " In handle continuation error. Old job purpose is " | 723 << old_job.purpose << " . The time delta(ms) is " |
730 << old_job.purpose << " . The time delta(ms) is " | 724 << length.InMilliseconds(); |
731 << length.InMilliseconds(); | |
732 | 725 |
733 // This will reset the had_nudge variable as well. | 726 // This will reset the had_nudge variable as well. |
734 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, | 727 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, |
735 length)); | 728 length)); |
736 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { | 729 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { |
737 SyncSession* old = old_job.session.get(); | 730 SyncSession* old = old_job.session.get(); |
738 SyncSession* s(new SyncSession(session_context_.get(), this, | 731 SyncSession* s(new SyncSession(session_context_.get(), this, |
739 old->source(), old->routing_info(), old->workers())); | 732 old->source(), old->routing_info(), old->workers())); |
740 SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, | 733 SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, |
741 make_linked_ptr(s), false, FROM_HERE); | 734 make_linked_ptr(s), false, FROM_HERE); |
(...skipping 27 matching lines...) Expand all Loading... | |
769 (rand_sign * (last_delay.InSeconds() / kBackoffRandomizationFactor)); | 762 (rand_sign * (last_delay.InSeconds() / kBackoffRandomizationFactor)); |
770 | 763 |
771 // Cap the backoff interval. | 764 // Cap the backoff interval. |
772 backoff_s = std::max(static_cast<int64>(1), | 765 backoff_s = std::max(static_cast<int64>(1), |
773 std::min(backoff_s, kMaxBackoffSeconds)); | 766 std::min(backoff_s, kMaxBackoffSeconds)); |
774 | 767 |
775 return TimeDelta::FromSeconds(backoff_s); | 768 return TimeDelta::FromSeconds(backoff_s); |
776 } | 769 } |
777 | 770 |
778 void SyncerThread::Stop() { | 771 void SyncerThread::Stop() { |
779 VLOG(1) << "SyncerThread(" << this << ")" << " stop called"; | 772 SVLOG(2) << "stop called"; |
780 syncer_->RequestEarlyExit(); // Safe to call from any thread. | 773 syncer_->RequestEarlyExit(); // Safe to call from any thread. |
781 session_context_->connection_manager()->RemoveListener(this); | 774 session_context_->connection_manager()->RemoveListener(this); |
782 thread_.Stop(); | 775 thread_.Stop(); |
783 } | 776 } |
784 | 777 |
785 void SyncerThread::DoCanaryJob() { | 778 void SyncerThread::DoCanaryJob() { |
786 VLOG(1) << "SyncerThread(" << this << ")" << " Do canary job"; | 779 SVLOG(2) << "Do canary job"; |
787 DoPendingJobIfPossible(true); | 780 DoPendingJobIfPossible(true); |
788 } | 781 } |
789 | 782 |
790 void SyncerThread::DoPendingJobIfPossible(bool is_canary_job) { | 783 void SyncerThread::DoPendingJobIfPossible(bool is_canary_job) { |
791 SyncSessionJob* job_to_execute = NULL; | 784 SyncSessionJob* job_to_execute = NULL; |
792 if (mode_ == CONFIGURATION_MODE && wait_interval_.get() | 785 if (mode_ == CONFIGURATION_MODE && wait_interval_.get() |
793 && wait_interval_->pending_configure_job.get()) { | 786 && wait_interval_->pending_configure_job.get()) { |
794 VLOG(1) << "SyncerThread(" << this << ")" << " Found pending configure job"; | 787 SVLOG(2) << "Found pending configure job"; |
795 job_to_execute = wait_interval_->pending_configure_job.get(); | 788 job_to_execute = wait_interval_->pending_configure_job.get(); |
796 } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) { | 789 } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) { |
797 VLOG(1) << "SyncerThread(" << this << ")" << " Found pending nudge job"; | 790 SVLOG(2) << "Found pending nudge job"; |
798 // Pending jobs mostly have time from the past. Reset it so this job | 791 // Pending jobs mostly have time from the past. Reset it so this job |
799 // will get executed. | 792 // will get executed. |
800 if (pending_nudge_->scheduled_start < TimeTicks::Now()) | 793 if (pending_nudge_->scheduled_start < TimeTicks::Now()) |
801 pending_nudge_->scheduled_start = TimeTicks::Now(); | 794 pending_nudge_->scheduled_start = TimeTicks::Now(); |
802 | 795 |
803 scoped_ptr<SyncSession> session(CreateSyncSession( | 796 scoped_ptr<SyncSession> session(CreateSyncSession( |
804 pending_nudge_->session->source())); | 797 pending_nudge_->session->source())); |
805 | 798 |
806 // Also the routing info might have been changed since we cached the | 799 // Also the routing info might have been changed since we cached the |
807 // pending nudge. Update it by coalescing to the latest. | 800 // pending nudge. Update it by coalescing to the latest. |
808 pending_nudge_->session->Coalesce(*(session.get())); | 801 pending_nudge_->session->Coalesce(*(session.get())); |
809 // The pending nudge would be cleared in the DoSyncSessionJob function. | 802 // The pending nudge would be cleared in the DoSyncSessionJob function. |
810 job_to_execute = pending_nudge_.get(); | 803 job_to_execute = pending_nudge_.get(); |
811 } | 804 } |
812 | 805 |
813 if (job_to_execute != NULL) { | 806 if (job_to_execute != NULL) { |
814 VLOG(1) << "SyncerThread(" << this << ")" << " Executing pending job"; | 807 SVLOG(2) << "Executing pending job"; |
815 SyncSessionJob copy = *job_to_execute; | 808 SyncSessionJob copy = *job_to_execute; |
816 copy.is_canary_job = is_canary_job; | 809 copy.is_canary_job = is_canary_job; |
817 DoSyncSessionJob(copy); | 810 DoSyncSessionJob(copy); |
818 } | 811 } |
819 } | 812 } |
820 | 813 |
821 SyncSession* SyncerThread::CreateSyncSession(const SyncSourceInfo& source) { | 814 SyncSession* SyncerThread::CreateSyncSession(const SyncSourceInfo& source) { |
822 ModelSafeRoutingInfo routes; | 815 ModelSafeRoutingInfo routes; |
823 std::vector<ModelSafeWorker*> workers; | 816 std::vector<ModelSafeWorker*> workers; |
824 session_context_->registrar()->GetModelSafeRoutingInfo(&routes); | 817 session_context_->registrar()->GetModelSafeRoutingInfo(&routes); |
(...skipping 12 matching lines...) Expand all Loading... | |
837 ModelTypePayloadMap types_with_payloads = | 830 ModelTypePayloadMap types_with_payloads = |
838 syncable::ModelTypePayloadMapFromRoutingInfo(r, std::string()); | 831 syncable::ModelTypePayloadMapFromRoutingInfo(r, std::string()); |
839 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads); | 832 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads); |
840 SyncSession* s = CreateSyncSession(info); | 833 SyncSession* s = CreateSyncSession(info); |
841 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s, | 834 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s, |
842 FROM_HERE); | 835 FROM_HERE); |
843 } | 836 } |
844 | 837 |
845 void SyncerThread::Unthrottle() { | 838 void SyncerThread::Unthrottle() { |
846 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); | 839 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); |
847 VLOG(1) << "SyncerThread(" << this << ")" << " Unthrottled.."; | 840 SVLOG(2) << "Unthrottled."; |
848 DoCanaryJob(); | 841 DoCanaryJob(); |
849 wait_interval_.reset(); | 842 wait_interval_.reset(); |
850 } | 843 } |
851 | 844 |
852 void SyncerThread::Notify(SyncEngineEvent::EventCause cause) { | 845 void SyncerThread::Notify(SyncEngineEvent::EventCause cause) { |
853 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | 846 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
854 session_context_->NotifyListeners(SyncEngineEvent(cause)); | 847 session_context_->NotifyListeners(SyncEngineEvent(cause)); |
855 } | 848 } |
856 | 849 |
857 bool SyncerThread::IsBackingOff() const { | 850 bool SyncerThread::IsBackingOff() const { |
(...skipping 19 matching lines...) Expand all Loading... | |
877 syncer_short_poll_interval_seconds_ = new_interval; | 870 syncer_short_poll_interval_seconds_ = new_interval; |
878 } | 871 } |
879 | 872 |
880 void SyncerThread::OnReceivedLongPollIntervalUpdate( | 873 void SyncerThread::OnReceivedLongPollIntervalUpdate( |
881 const base::TimeDelta& new_interval) { | 874 const base::TimeDelta& new_interval) { |
882 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | 875 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
883 syncer_long_poll_interval_seconds_ = new_interval; | 876 syncer_long_poll_interval_seconds_ = new_interval; |
884 } | 877 } |
885 | 878 |
886 void SyncerThread::OnShouldStopSyncingPermanently() { | 879 void SyncerThread::OnShouldStopSyncingPermanently() { |
887 VLOG(1) << "SyncerThread(" << this << ")" | 880 SVLOG(2) << "OnShouldStopSyncingPermanently"; |
888 << " OnShouldStopSyncingPermanently"; | |
889 syncer_->RequestEarlyExit(); // Thread-safe. | 881 syncer_->RequestEarlyExit(); // Thread-safe. |
890 Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); | 882 Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); |
891 } | 883 } |
892 | 884 |
893 void SyncerThread::OnServerConnectionEvent( | 885 void SyncerThread::OnServerConnectionEvent( |
894 const ServerConnectionEvent& event) { | 886 const ServerConnectionEvent& event) { |
895 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this, | 887 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this, |
896 &SyncerThread::CheckServerConnectionManagerStatus, | 888 &SyncerThread::CheckServerConnectionManagerStatus, |
897 event.connection_code)); | 889 event.connection_code)); |
898 } | 890 } |
899 | 891 |
900 void SyncerThread::set_notifications_enabled(bool notifications_enabled) { | 892 void SyncerThread::set_notifications_enabled(bool notifications_enabled) { |
901 session_context_->set_notifications_enabled(notifications_enabled); | 893 session_context_->set_notifications_enabled(notifications_enabled); |
902 } | 894 } |
903 | 895 |
896 #undef SVLOG | |
897 | |
904 } // browser_sync | 898 } // browser_sync |
OLD | NEW |