Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "chrome/browser/sync/engine/syncer_thread.h" | 5 #include "chrome/browser/sync/engine/syncer_thread.h" |
| 6 | 6 |
| 7 #include <algorithm> | 7 #include <algorithm> |
| 8 | 8 |
| 9 #include "base/rand_util.h" | 9 #include "base/rand_util.h" |
| 10 #include "chrome/browser/sync/engine/syncer.h" | 10 #include "chrome/browser/sync/engine/syncer.h" |
| (...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 72 default: | 72 default: |
| 73 NOTREACHED(); | 73 NOTREACHED(); |
| 74 } | 74 } |
| 75 | 75 |
| 76 return GetUpdatesCallerInfo::UNKNOWN; | 76 return GetUpdatesCallerInfo::UNKNOWN; |
| 77 } | 77 } |
| 78 | 78 |
| 79 SyncerThread::WaitInterval::WaitInterval(Mode mode, TimeDelta length) | 79 SyncerThread::WaitInterval::WaitInterval(Mode mode, TimeDelta length) |
| 80 : mode(mode), had_nudge(false), length(length) { } | 80 : mode(mode), had_nudge(false), length(length) { } |
| 81 | 81 |
| 82 SyncerThread::SyncerThread(sessions::SyncSessionContext* context, | 82 // Helper macro to log with the syncer thread name; useful when there |
| 83 // are multiple syncer threads involved. | |
| 84 #define SVLOG(verbose_level) VLOG(verbose_level) << name_ << ": " | |
| 85 | |
| 86 SyncerThread::SyncerThread(const std::string& name, | |
| 87 sessions::SyncSessionContext* context, | |
| 83 Syncer* syncer) | 88 Syncer* syncer) |
| 84 : thread_("SyncEngine_SyncerThread"), | 89 : name_(name), |
| 90 thread_("SyncEngine_SyncerThread"), | |
| 85 syncer_short_poll_interval_seconds_( | 91 syncer_short_poll_interval_seconds_( |
| 86 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), | 92 TimeDelta::FromSeconds(kDefaultShortPollIntervalSeconds)), |
| 87 syncer_long_poll_interval_seconds_( | 93 syncer_long_poll_interval_seconds_( |
| 88 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), | 94 TimeDelta::FromSeconds(kDefaultLongPollIntervalSeconds)), |
| 89 mode_(NORMAL_MODE), | 95 mode_(NORMAL_MODE), |
| 90 server_connection_ok_(false), | 96 server_connection_ok_(false), |
| 91 delay_provider_(new DelayProvider()), | 97 delay_provider_(new DelayProvider()), |
| 92 syncer_(syncer), | 98 syncer_(syncer), |
| 93 session_context_(context) { | 99 session_context_(context) { |
| 94 } | 100 } |
| 95 | 101 |
| 96 SyncerThread::~SyncerThread() { | 102 SyncerThread::~SyncerThread() { |
| 97 DCHECK(!thread_.IsRunning()); | 103 DCHECK(!thread_.IsRunning()); |
| 98 } | 104 } |
| 99 | 105 |
| 100 void SyncerThread::CheckServerConnectionManagerStatus( | 106 void SyncerThread::CheckServerConnectionManagerStatus( |
| 101 HttpResponse::ServerConnectionCode code) { | 107 HttpResponse::ServerConnectionCode code) { |
| 108 bool old_server_connection_ok = server_connection_ok_; | |
| 102 | 109 |
| 103 VLOG(1) << "SyncerThread(" << this << ")" << " Server connection changed." | |
| 104 << "Old mode: " << server_connection_ok_ << " Code: " << code; | |
| 105 // Note, be careful when adding cases here because if the SyncerThread | 110 // Note, be careful when adding cases here because if the SyncerThread |
| 106 // thinks there is no valid connection as determined by this method, it | 111 // thinks there is no valid connection as determined by this method, it |
| 107 // will drop out of *all* forward progress sync loops (it won't poll and it | 112 // will drop out of *all* forward progress sync loops (it won't poll and it |
| 108 // will queue up Talk notifications but not actually call SyncShare) until | 113 // will queue up Talk notifications but not actually call SyncShare) until |
| 109 // some external action causes a ServerConnectionManager to broadcast that | 114 // some external action causes a ServerConnectionManager to broadcast that |
| 110 // a valid connection has been re-established. | 115 // a valid connection has been re-established. |
| 111 if (HttpResponse::CONNECTION_UNAVAILABLE == code || | 116 if (HttpResponse::CONNECTION_UNAVAILABLE == code || |
| 112 HttpResponse::SYNC_AUTH_ERROR == code) { | 117 HttpResponse::SYNC_AUTH_ERROR == code) { |
| 113 server_connection_ok_ = false; | 118 server_connection_ok_ = false; |
| 114 VLOG(1) << "SyncerThread(" << this << ")" << " Server connection changed." | |
| 115 << " new mode:" << server_connection_ok_; | |
| 116 } else if (HttpResponse::SERVER_CONNECTION_OK == code) { | 119 } else if (HttpResponse::SERVER_CONNECTION_OK == code) { |
| 117 server_connection_ok_ = true; | 120 server_connection_ok_ = true; |
| 118 VLOG(1) << "SyncerThread(" << this << ")" << " Server connection changed." | |
| 119 << " new mode:" << server_connection_ok_; | |
| 120 DoCanaryJob(); | 121 DoCanaryJob(); |
| 121 } | 122 } |
| 123 | |
| 124 if (old_server_connection_ok != server_connection_ok_) { | |
| 125 SVLOG(2) << "Server connection changed. Old mode: " | |
| 126 << old_server_connection_ok << ", new mode: " | |
| 127 << server_connection_ok_ << ", code: " << code; | |
| 128 } | |
| 122 } | 129 } |
| 123 | 130 |
| 124 void SyncerThread::Start(Mode mode, ModeChangeCallback* callback) { | 131 void SyncerThread::Start(Mode mode, ModeChangeCallback* callback) { |
| 125 VLOG(1) << "SyncerThread(" << this << ")" << " Start called from thread " | 132 SVLOG(2) << "Start called from thread " |
| 126 << MessageLoop::current()->thread_name(); | 133 << MessageLoop::current()->thread_name() << " with mode " |
| 134 << mode; | |
| 127 if (!thread_.IsRunning()) { | 135 if (!thread_.IsRunning()) { |
| 128 VLOG(1) << "SyncerThread(" << this << ")" << " Starting thread with mode " | 136 SVLOG(2) << "Starting thread with mode " << mode; |
| 129 << mode; | |
| 130 if (!thread_.Start()) { | 137 if (!thread_.Start()) { |
| 131 NOTREACHED() << "Unable to start SyncerThread."; | 138 NOTREACHED() << "Unable to start SyncerThread."; |
| 132 return; | 139 return; |
| 133 } | 140 } |
| 134 WatchConnectionManager(); | 141 WatchConnectionManager(); |
| 135 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( | 142 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
| 136 this, &SyncerThread::SendInitialSnapshot)); | 143 this, &SyncerThread::SendInitialSnapshot)); |
| 137 } | 144 } |
| 138 | 145 |
| 139 VLOG(1) << "SyncerThread(" << this << ")" << " Entering start with mode = " | |
| 140 << mode; | |
| 141 | |
| 142 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( | 146 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
| 143 this, &SyncerThread::StartImpl, mode, callback)); | 147 this, &SyncerThread::StartImpl, mode, callback)); |
| 144 } | 148 } |
| 145 | 149 |
| 146 void SyncerThread::SendInitialSnapshot() { | 150 void SyncerThread::SendInitialSnapshot() { |
| 147 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | 151 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| 148 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_.get(), this, | 152 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_.get(), this, |
| 149 SyncSourceInfo(), ModelSafeRoutingInfo(), | 153 SyncSourceInfo(), ModelSafeRoutingInfo(), |
| 150 std::vector<ModelSafeWorker*>())); | 154 std::vector<ModelSafeWorker*>())); |
| 151 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); | 155 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED); |
| 152 sessions::SyncSessionSnapshot snapshot(dummy->TakeSnapshot()); | 156 sessions::SyncSessionSnapshot snapshot(dummy->TakeSnapshot()); |
| 153 event.snapshot = &snapshot; | 157 event.snapshot = &snapshot; |
| 154 session_context_->NotifyListeners(event); | 158 session_context_->NotifyListeners(event); |
| 155 } | 159 } |
| 156 | 160 |
| 157 void SyncerThread::WatchConnectionManager() { | 161 void SyncerThread::WatchConnectionManager() { |
| 158 ServerConnectionManager* scm = session_context_->connection_manager(); | 162 ServerConnectionManager* scm = session_context_->connection_manager(); |
| 159 CheckServerConnectionManagerStatus(scm->server_status()); | 163 CheckServerConnectionManagerStatus(scm->server_status()); |
| 160 scm->AddListener(this); | 164 scm->AddListener(this); |
| 161 } | 165 } |
| 162 | 166 |
| 163 void SyncerThread::StartImpl(Mode mode, ModeChangeCallback* callback) { | 167 void SyncerThread::StartImpl(Mode mode, ModeChangeCallback* callback) { |
| 164 VLOG(1) << "SyncerThread(" << this << ")" << " Doing StartImpl with mode " | 168 SVLOG(2) << "Doing StartImpl with mode " << mode; |
| 165 << mode; | |
| 166 | 169 |
| 167 // TODO(lipalani): This will leak if startimpl is never run. Fix it using a | 170 // TODO(lipalani): This will leak if startimpl is never run. Fix it using a |
| 168 // ThreadSafeRefcounted object. | 171 // ThreadSafeRefcounted object. |
| 169 scoped_ptr<ModeChangeCallback> scoped_callback(callback); | 172 scoped_ptr<ModeChangeCallback> scoped_callback(callback); |
| 170 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | 173 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| 171 DCHECK(!session_context_->account_name().empty()); | 174 DCHECK(!session_context_->account_name().empty()); |
| 172 DCHECK(syncer_.get()); | 175 DCHECK(syncer_.get()); |
| 173 mode_ = mode; | 176 mode_ = mode; |
| 174 AdjustPolling(NULL); // Will kick start poll timer if needed. | 177 AdjustPolling(NULL); // Will kick start poll timer if needed. |
| 175 if (scoped_callback.get()) | 178 if (scoped_callback.get()) |
| 176 scoped_callback->Run(); | 179 scoped_callback->Run(); |
| 177 | 180 |
| 178 // We just changed our mode. See if there are any pending jobs that we could | 181 // We just changed our mode. See if there are any pending jobs that we could |
| 179 // execute in the new mode. | 182 // execute in the new mode. |
| 180 DoPendingJobIfPossible(false); | 183 DoPendingJobIfPossible(false); |
| 181 } | 184 } |
| 182 | 185 |
| 183 SyncerThread::JobProcessDecision SyncerThread::DecideWhileInWaitInterval( | 186 SyncerThread::JobProcessDecision SyncerThread::DecideWhileInWaitInterval( |
| 184 const SyncSessionJob& job) { | 187 const SyncSessionJob& job) { |
| 185 | 188 |
| 186 DCHECK(wait_interval_.get()); | 189 DCHECK(wait_interval_.get()); |
| 187 DCHECK_NE(job.purpose, SyncSessionJob::CLEAR_USER_DATA); | 190 DCHECK_NE(job.purpose, SyncSessionJob::CLEAR_USER_DATA); |
| 188 | 191 |
| 189 VLOG(1) << "SyncerThread(" << this << ")" << " Wait interval mode : " | 192 SVLOG(2) << "Wait interval mode : " |
| 190 << wait_interval_->mode << "Wait interval had nudge : " | 193 << wait_interval_->mode << "Wait interval had nudge : " |
|
Nicolas Zea
2011/06/07 19:55:24
"Wait interval... -> ", wait interval...
"is canar
akalin
2011/06/07 20:17:38
Done.
| |
| 191 << wait_interval_->had_nudge << "is canary job : " | 194 << wait_interval_->had_nudge << "is canary job : " |
| 192 << job.is_canary_job; | 195 << job.is_canary_job; |
| 193 | 196 |
| 194 if (job.purpose == SyncSessionJob::POLL) | 197 if (job.purpose == SyncSessionJob::POLL) |
| 195 return DROP; | 198 return DROP; |
| 196 | 199 |
| 197 DCHECK(job.purpose == SyncSessionJob::NUDGE || | 200 DCHECK(job.purpose == SyncSessionJob::NUDGE || |
| 198 job.purpose == SyncSessionJob::CONFIGURATION); | 201 job.purpose == SyncSessionJob::CONFIGURATION); |
| 199 if (wait_interval_->mode == WaitInterval::THROTTLED) | 202 if (wait_interval_->mode == WaitInterval::THROTTLED) |
| 200 return SAVE; | 203 return SAVE; |
| 201 | 204 |
| 202 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); | 205 DCHECK_EQ(wait_interval_->mode, WaitInterval::EXPONENTIAL_BACKOFF); |
| (...skipping 25 matching lines...) Expand all Loading... | |
| 228 else | 231 else |
| 229 return DROP; | 232 return DROP; |
| 230 } | 233 } |
| 231 | 234 |
| 232 // We are in normal mode. | 235 // We are in normal mode. |
| 233 DCHECK_EQ(mode_, NORMAL_MODE); | 236 DCHECK_EQ(mode_, NORMAL_MODE); |
| 234 DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION); | 237 DCHECK_NE(job.purpose, SyncSessionJob::CONFIGURATION); |
| 235 | 238 |
| 236 // Freshness condition | 239 // Freshness condition |
| 237 if (job.scheduled_start < last_sync_session_end_time_) { | 240 if (job.scheduled_start < last_sync_session_end_time_) { |
| 238 VLOG(1) << "SyncerThread(" << this << ")" | 241 SVLOG(2) << "Dropping job because of freshness"; |
| 239 << " Dropping job because of freshness"; | |
| 240 return DROP; | 242 return DROP; |
| 241 } | 243 } |
| 242 | 244 |
| 243 if (server_connection_ok_) | 245 if (server_connection_ok_) |
| 244 return CONTINUE; | 246 return CONTINUE; |
| 245 | 247 |
| 246 VLOG(1) << "SyncerThread(" << this << ")" | 248 SVLOG(2) << "Bad server connection. Using that to decide on job."; |
| 247 << " Bad server connection. Using that to decide on job."; | |
| 248 return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP; | 249 return job.purpose == SyncSessionJob::NUDGE ? SAVE : DROP; |
| 249 } | 250 } |
| 250 | 251 |
| 251 void SyncerThread::InitOrCoalescePendingJob(const SyncSessionJob& job) { | 252 void SyncerThread::InitOrCoalescePendingJob(const SyncSessionJob& job) { |
| 252 DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); | 253 DCHECK(job.purpose != SyncSessionJob::CONFIGURATION); |
| 253 if (pending_nudge_.get() == NULL) { | 254 if (pending_nudge_.get() == NULL) { |
| 254 VLOG(1) << "SyncerThread(" << this << ")" | 255 SVLOG(2) << "Creating a pending nudge job"; |
| 255 << " Creating a pending nudge job"; | |
| 256 SyncSession* s = job.session.get(); | 256 SyncSession* s = job.session.get(); |
| 257 scoped_ptr<SyncSession> session(new SyncSession(s->context(), | 257 scoped_ptr<SyncSession> session(new SyncSession(s->context(), |
| 258 s->delegate(), s->source(), s->routing_info(), s->workers())); | 258 s->delegate(), s->source(), s->routing_info(), s->workers())); |
| 259 | 259 |
| 260 SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, | 260 SyncSessionJob new_job(SyncSessionJob::NUDGE, job.scheduled_start, |
| 261 make_linked_ptr(session.release()), false, job.nudge_location); | 261 make_linked_ptr(session.release()), false, job.nudge_location); |
| 262 pending_nudge_.reset(new SyncSessionJob(new_job)); | 262 pending_nudge_.reset(new SyncSessionJob(new_job)); |
| 263 | 263 |
| 264 return; | 264 return; |
| 265 } | 265 } |
| 266 | 266 |
| 267 VLOG(1) << "SyncerThread(" << this << ")" << " Coalescing a pending nudge"; | 267 SVLOG(2) << "Coalescing a pending nudge"; |
| 268 pending_nudge_->session->Coalesce(*(job.session.get())); | 268 pending_nudge_->session->Coalesce(*(job.session.get())); |
| 269 pending_nudge_->scheduled_start = job.scheduled_start; | 269 pending_nudge_->scheduled_start = job.scheduled_start; |
| 270 | 270 |
| 271 // Unfortunately the nudge location cannot be modified. So it stores the | 271 // Unfortunately the nudge location cannot be modified. So it stores the |
| 272 // location of the first caller. | 272 // location of the first caller. |
| 273 } | 273 } |
| 274 | 274 |
| 275 bool SyncerThread::ShouldRunJob(const SyncSessionJob& job) { | 275 bool SyncerThread::ShouldRunJob(const SyncSessionJob& job) { |
| 276 JobProcessDecision decision = DecideOnJob(job); | 276 JobProcessDecision decision = DecideOnJob(job); |
| 277 VLOG(1) << "SyncerThread(" << this << ")" << " Should run job, decision: " | 277 SVLOG(2) << "Should run job, decision: " |
| 278 << decision << " Job purpose " << job.purpose << "mode " << mode_; | 278 << decision << " Job purpose " << job.purpose |
|
Nicolas Zea
2011/06/07 19:55:24
", Job purpose "
akalin
2011/06/07 20:17:38
Done.
| |
| 279 << ", mode " << mode_; | |
| 279 if (decision != SAVE) | 280 if (decision != SAVE) |
| 280 return decision == CONTINUE; | 281 return decision == CONTINUE; |
| 281 | 282 |
| 282 DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose == | 283 DCHECK(job.purpose == SyncSessionJob::NUDGE || job.purpose == |
| 283 SyncSessionJob::CONFIGURATION); | 284 SyncSessionJob::CONFIGURATION); |
| 284 | 285 |
| 285 SaveJob(job); | 286 SaveJob(job); |
| 286 return false; | 287 return false; |
| 287 } | 288 } |
| 288 | 289 |
| 289 void SyncerThread::SaveJob(const SyncSessionJob& job) { | 290 void SyncerThread::SaveJob(const SyncSessionJob& job) { |
| 290 DCHECK(job.purpose != SyncSessionJob::CLEAR_USER_DATA); | 291 DCHECK(job.purpose != SyncSessionJob::CLEAR_USER_DATA); |
| 291 if (job.purpose == SyncSessionJob::NUDGE) { | 292 if (job.purpose == SyncSessionJob::NUDGE) { |
| 292 VLOG(1) << "SyncerThread(" << this << ")" << " Saving a nudge job"; | 293 SVLOG(2) << "Saving a nudge job"; |
| 293 InitOrCoalescePendingJob(job); | 294 InitOrCoalescePendingJob(job); |
| 294 } else if (job.purpose == SyncSessionJob::CONFIGURATION){ | 295 } else if (job.purpose == SyncSessionJob::CONFIGURATION){ |
| 295 VLOG(1) << "SyncerThread(" << this << ")" << " Saving a configuration job"; | 296 SVLOG(2) << "Saving a configuration job"; |
| 296 DCHECK(wait_interval_.get()); | 297 DCHECK(wait_interval_.get()); |
| 297 DCHECK(mode_ == CONFIGURATION_MODE); | 298 DCHECK(mode_ == CONFIGURATION_MODE); |
| 298 | 299 |
| 299 SyncSession* old = job.session.get(); | 300 SyncSession* old = job.session.get(); |
| 300 SyncSession* s(new SyncSession(session_context_.get(), this, | 301 SyncSession* s(new SyncSession(session_context_.get(), this, |
| 301 old->source(), old->routing_info(), old->workers())); | 302 old->source(), old->routing_info(), old->workers())); |
| 302 SyncSessionJob new_job(job.purpose, TimeTicks::Now(), | 303 SyncSessionJob new_job(job.purpose, TimeTicks::Now(), |
| 303 make_linked_ptr(s), false, job.nudge_location); | 304 make_linked_ptr(s), false, job.nudge_location); |
| 304 wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); | 305 wait_interval_->pending_configure_job.reset(new SyncSessionJob(new_job)); |
| 305 } // drop the rest. | 306 } // drop the rest. |
| (...skipping 14 matching lines...) Expand all Loading... | |
| 320 return; | 321 return; |
| 321 } | 322 } |
| 322 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( | 323 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
| 323 this, &SyncerThread::ScheduleClearUserDataImpl)); | 324 this, &SyncerThread::ScheduleClearUserDataImpl)); |
| 324 } | 325 } |
| 325 | 326 |
| 326 void SyncerThread::ScheduleNudge(const TimeDelta& delay, | 327 void SyncerThread::ScheduleNudge(const TimeDelta& delay, |
| 327 NudgeSource source, const ModelTypeBitSet& types, | 328 NudgeSource source, const ModelTypeBitSet& types, |
| 328 const tracked_objects::Location& nudge_location) { | 329 const tracked_objects::Location& nudge_location) { |
| 329 if (!thread_.IsRunning()) { | 330 if (!thread_.IsRunning()) { |
| 330 VLOG(0) << "Dropping nudge because thread is not running."; | 331 LOG(INFO) << "Dropping nudge because thread is not running."; |
| 331 NOTREACHED(); | 332 NOTREACHED(); |
| 332 return; | 333 return; |
| 333 } | 334 } |
| 334 | 335 |
| 335 VLOG(1) << "SyncerThread(" << this << ")" << " Nudge scheduled"; | 336 SVLOG(2) << "Nudge scheduled (source=" << source << ")"; |
| 336 | 337 |
| 337 ModelTypePayloadMap types_with_payloads = | 338 ModelTypePayloadMap types_with_payloads = |
| 338 syncable::ModelTypePayloadMapFromBitSet(types, std::string()); | 339 syncable::ModelTypePayloadMapFromBitSet(types, std::string()); |
| 339 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( | 340 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
| 340 this, &SyncerThread::ScheduleNudgeImpl, delay, | 341 this, &SyncerThread::ScheduleNudgeImpl, delay, |
| 341 GetUpdatesFromNudgeSource(source), types_with_payloads, false, | 342 GetUpdatesFromNudgeSource(source), types_with_payloads, false, |
| 342 nudge_location)); | 343 nudge_location)); |
| 343 } | 344 } |
| 344 | 345 |
| 345 void SyncerThread::ScheduleNudgeWithPayloads(const TimeDelta& delay, | 346 void SyncerThread::ScheduleNudgeWithPayloads(const TimeDelta& delay, |
| 346 NudgeSource source, const ModelTypePayloadMap& types_with_payloads, | 347 NudgeSource source, const ModelTypePayloadMap& types_with_payloads, |
| 347 const tracked_objects::Location& nudge_location) { | 348 const tracked_objects::Location& nudge_location) { |
| 348 if (!thread_.IsRunning()) { | 349 if (!thread_.IsRunning()) { |
| 349 NOTREACHED(); | 350 NOTREACHED(); |
| 350 return; | 351 return; |
| 351 } | 352 } |
| 352 | 353 |
| 353 VLOG(1) << "SyncerThread(" << this << ")" << " Nudge scheduled with payloads"; | 354 SVLOG(2) << "Nudge scheduled with payloads (source=" << source << ")"; |
| 354 | 355 |
| 355 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( | 356 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
| 356 this, &SyncerThread::ScheduleNudgeImpl, delay, | 357 this, &SyncerThread::ScheduleNudgeImpl, delay, |
| 357 GetUpdatesFromNudgeSource(source), types_with_payloads, false, | 358 GetUpdatesFromNudgeSource(source), types_with_payloads, false, |
| 358 nudge_location)); | 359 nudge_location)); |
| 359 } | 360 } |
| 360 | 361 |
| 361 void SyncerThread::ScheduleClearUserDataImpl() { | 362 void SyncerThread::ScheduleClearUserDataImpl() { |
| 362 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | 363 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| 363 SyncSession* session = new SyncSession(session_context_.get(), this, | 364 SyncSession* session = new SyncSession(session_context_.get(), this, |
| 364 SyncSourceInfo(), ModelSafeRoutingInfo(), | 365 SyncSourceInfo(), ModelSafeRoutingInfo(), |
| 365 std::vector<ModelSafeWorker*>()); | 366 std::vector<ModelSafeWorker*>()); |
| 366 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), | 367 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), |
| 367 SyncSessionJob::CLEAR_USER_DATA, session, FROM_HERE); | 368 SyncSessionJob::CLEAR_USER_DATA, session, FROM_HERE); |
| 368 } | 369 } |
| 369 | 370 |
| 370 void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay, | 371 void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay, |
| 371 GetUpdatesCallerInfo::GetUpdatesSource source, | 372 GetUpdatesCallerInfo::GetUpdatesSource source, |
| 372 const ModelTypePayloadMap& types_with_payloads, | 373 const ModelTypePayloadMap& types_with_payloads, |
| 373 bool is_canary_job, const tracked_objects::Location& nudge_location) { | 374 bool is_canary_job, const tracked_objects::Location& nudge_location) { |
| 374 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | 375 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| 375 | 376 |
| 376 VLOG(1) << "SyncerThread(" << this << ")" << " Running Schedule nudge impl"; | 377 SVLOG(2) << "Running Schedule nudge impl (source=" << source << ")"; |
| 377 // Note we currently nudge for all types regardless of the ones incurring | 378 // Note we currently nudge for all types regardless of the ones incurring |
| 378 // the nudge. Doing different would throw off some syncer commands like | 379 // the nudge. Doing different would throw off some syncer commands like |
| 379 // CleanupDisabledTypes. We may want to change this in the future. | 380 // CleanupDisabledTypes. We may want to change this in the future. |
| 380 SyncSourceInfo info(source, types_with_payloads); | 381 SyncSourceInfo info(source, types_with_payloads); |
| 381 | 382 |
| 382 SyncSession* session(CreateSyncSession(info)); | 383 SyncSession* session(CreateSyncSession(info)); |
| 383 SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, | 384 SyncSessionJob job(SyncSessionJob::NUDGE, TimeTicks::Now() + delay, |
| 384 make_linked_ptr(session), is_canary_job, | 385 make_linked_ptr(session), is_canary_job, |
| 385 nudge_location); | 386 nudge_location); |
| 386 | 387 |
| 387 session = NULL; | 388 session = NULL; |
| 388 if (!ShouldRunJob(job)) | 389 if (!ShouldRunJob(job)) |
| 389 return; | 390 return; |
| 390 | 391 |
| 391 if (pending_nudge_.get()) { | 392 if (pending_nudge_.get()) { |
| 392 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) { | 393 if (IsBackingOff() && delay > TimeDelta::FromSeconds(1)) { |
| 393 VLOG(1) << "SyncerThread(" << this << ")" << " Dropping the nudge because" | 394 SVLOG(2) << "Dropping the nudge because we are in backoff"; |
| 394 << "we are in backoff"; | |
| 395 return; | 395 return; |
| 396 } | 396 } |
| 397 | 397 |
| 398 VLOG(1) << "SyncerThread(" << this << ")" << " Coalescing pending nudge"; | 398 SVLOG(2) << "Coalescing pending nudge"; |
| 399 pending_nudge_->session->Coalesce(*(job.session.get())); | 399 pending_nudge_->session->Coalesce(*(job.session.get())); |
| 400 | 400 |
| 401 if (!IsBackingOff()) { | 401 if (!IsBackingOff()) { |
| 402 VLOG(1) << "SyncerThread(" << this << ")" << " Dropping a nudge because" | 402 SVLOG(2) << "Dropping a nudge because" |
| 403 << " we are not in backoff and the job was coalesced"; | 403 << " we are not in backoff and the job was coalesced"; |
| 404 return; | 404 return; |
| 405 } else { | 405 } else { |
| 406 VLOG(1) << "SyncerThread(" << this << ")" | 406 SVLOG(2) << "Rescheduling pending nudge"; |
| 407 << " Rescheduling pending nudge"; | |
| 408 SyncSession* s = pending_nudge_->session.get(); | 407 SyncSession* s = pending_nudge_->session.get(); |
| 409 job.session.reset(new SyncSession(s->context(), s->delegate(), | 408 job.session.reset(new SyncSession(s->context(), s->delegate(), |
| 410 s->source(), s->routing_info(), s->workers())); | 409 s->source(), s->routing_info(), s->workers())); |
| 411 pending_nudge_.reset(); | 410 pending_nudge_.reset(); |
| 412 } | 411 } |
| 413 } | 412 } |
| 414 | 413 |
| 415 // TODO(lipalani) - pass the job itself to ScheduleSyncSessionJob. | 414 // TODO(lipalani) - pass the job itself to ScheduleSyncSessionJob. |
| 416 ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, job.session.release(), | 415 ScheduleSyncSessionJob(delay, SyncSessionJob::NUDGE, job.session.release(), |
| 417 nudge_location); | 416 nudge_location); |
| (...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 458 if (it != w_tmp.end()) | 457 if (it != w_tmp.end()) |
| 459 workers->push_back(*it); | 458 workers->push_back(*it); |
| 460 else | 459 else |
| 461 NOTREACHED(); | 460 NOTREACHED(); |
| 462 } | 461 } |
| 463 } | 462 } |
| 464 | 463 |
| 465 void SyncerThread::ScheduleConfig(const ModelTypeBitSet& types, | 464 void SyncerThread::ScheduleConfig(const ModelTypeBitSet& types, |
| 466 sync_api::ConfigureReason reason) { | 465 sync_api::ConfigureReason reason) { |
| 467 if (!thread_.IsRunning()) { | 466 if (!thread_.IsRunning()) { |
| 468 VLOG(0) << "ScheduleConfig failed because thread is not running."; | 467 LOG(INFO) << "ScheduleConfig failed because thread is not running."; |
| 469 NOTREACHED(); | 468 NOTREACHED(); |
| 470 return; | 469 return; |
| 471 } | 470 } |
| 472 | 471 |
| 473 VLOG(1) << "SyncerThread(" << this << ")" << " Scheduling a config"; | 472 SVLOG(2) << "Scheduling a config"; |
| 474 ModelSafeRoutingInfo routes; | 473 ModelSafeRoutingInfo routes; |
| 475 std::vector<ModelSafeWorker*> workers; | 474 std::vector<ModelSafeWorker*> workers; |
| 476 GetModelSafeParamsForTypes(types, session_context_->registrar(), | 475 GetModelSafeParamsForTypes(types, session_context_->registrar(), |
| 477 &routes, &workers); | 476 &routes, &workers); |
| 478 | 477 |
| 479 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( | 478 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( |
| 480 this, &SyncerThread::ScheduleConfigImpl, routes, workers, | 479 this, &SyncerThread::ScheduleConfigImpl, routes, workers, |
| 481 GetSourceFromReason(reason))); | 480 GetSourceFromReason(reason))); |
| 482 } | 481 } |
| 483 | 482 |
| 484 void SyncerThread::ScheduleConfigImpl(const ModelSafeRoutingInfo& routing_info, | 483 void SyncerThread::ScheduleConfigImpl(const ModelSafeRoutingInfo& routing_info, |
| 485 const std::vector<ModelSafeWorker*>& workers, | 484 const std::vector<ModelSafeWorker*>& workers, |
| 486 const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source) { | 485 const sync_pb::GetUpdatesCallerInfo::GetUpdatesSource source) { |
| 487 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | 486 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| 488 | 487 |
| 489 VLOG(1) << "SyncerThread(" << this << ")" << " ScheduleConfigImpl..."; | 488 SVLOG(2) << "ScheduleConfigImpl..."; |
| 490 // TODO(tim): config-specific GetUpdatesCallerInfo value? | 489 // TODO(tim): config-specific GetUpdatesCallerInfo value? |
| 491 SyncSession* session = new SyncSession(session_context_.get(), this, | 490 SyncSession* session = new SyncSession(session_context_.get(), this, |
| 492 SyncSourceInfo(source, | 491 SyncSourceInfo(source, |
| 493 syncable::ModelTypePayloadMapFromRoutingInfo( | 492 syncable::ModelTypePayloadMapFromRoutingInfo( |
| 494 routing_info, std::string())), | 493 routing_info, std::string())), |
| 495 routing_info, workers); | 494 routing_info, workers); |
| 496 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), | 495 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), |
| 497 SyncSessionJob::CONFIGURATION, session, FROM_HERE); | 496 SyncSessionJob::CONFIGURATION, session, FROM_HERE); |
| 498 } | 497 } |
| 499 | 498 |
| 500 void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay, | 499 void SyncerThread::ScheduleSyncSessionJob(const base::TimeDelta& delay, |
| 501 SyncSessionJob::SyncSessionJobPurpose purpose, | 500 SyncSessionJob::SyncSessionJobPurpose purpose, |
| 502 sessions::SyncSession* session, | 501 sessions::SyncSession* session, |
| 503 const tracked_objects::Location& nudge_location) { | 502 const tracked_objects::Location& nudge_location) { |
| 504 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | 503 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| 505 | 504 |
| 506 SyncSessionJob job(purpose, TimeTicks::Now() + delay, | 505 SyncSessionJob job(purpose, TimeTicks::Now() + delay, |
| 507 make_linked_ptr(session), false, nudge_location); | 506 make_linked_ptr(session), false, nudge_location); |
| 508 if (purpose == SyncSessionJob::NUDGE) { | 507 if (purpose == SyncSessionJob::NUDGE) { |
| 509 VLOG(1) << "SyncerThread(" << this << ")" << " Resetting pending_nudge in" | 508 SVLOG(2) << "Resetting pending_nudge in" |
| 510 << " ScheduleSyncSessionJob"; | 509 << " ScheduleSyncSessionJob"; |
| 511 DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == session); | 510 DCHECK(!pending_nudge_.get() || pending_nudge_->session.get() == session); |
| 512 pending_nudge_.reset(new SyncSessionJob(job)); | 511 pending_nudge_.reset(new SyncSessionJob(job)); |
| 513 } | 512 } |
| 514 VLOG(1) << "SyncerThread(" << this << ")" | 513 SVLOG(2) << "Posting job to execute in DoSyncSessionJob. Job purpose " |
| 515 << " Posting job to execute in DoSyncSessionJob. Job purpose " | 514 << job.purpose; |
| 516 << job.purpose; | |
| 517 MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(this, | 515 MessageLoop::current()->PostDelayedTask(FROM_HERE, NewRunnableMethod(this, |
| 518 &SyncerThread::DoSyncSessionJob, job), | 516 &SyncerThread::DoSyncSessionJob, job), |
| 519 delay.InMilliseconds()); | 517 delay.InMilliseconds()); |
| 520 } | 518 } |
| 521 | 519 |
| 522 void SyncerThread::SetSyncerStepsForPurpose( | 520 void SyncerThread::SetSyncerStepsForPurpose( |
| 523 SyncSessionJob::SyncSessionJobPurpose purpose, | 521 SyncSessionJob::SyncSessionJobPurpose purpose, |
| 524 SyncerStep* start, SyncerStep* end) { | 522 SyncerStep* start, SyncerStep* end) { |
| 525 *end = SYNCER_END; | 523 *end = SYNCER_END; |
| 526 switch (purpose) { | 524 switch (purpose) { |
| (...skipping 18 matching lines...) Expand all Loading... | |
| 545 if (!ShouldRunJob(job)) { | 543 if (!ShouldRunJob(job)) { |
| 546 LOG(WARNING) << "Not executing job at DoSyncSessionJob, purpose = " | 544 LOG(WARNING) << "Not executing job at DoSyncSessionJob, purpose = " |
| 547 << job.purpose << " source = " | 545 << job.purpose << " source = " |
| 548 << job.session->source().updates_source; | 546 << job.session->source().updates_source; |
| 549 return; | 547 return; |
| 550 } | 548 } |
| 551 | 549 |
| 552 if (job.purpose == SyncSessionJob::NUDGE) { | 550 if (job.purpose == SyncSessionJob::NUDGE) { |
| 553 if (pending_nudge_.get() == NULL || | 551 if (pending_nudge_.get() == NULL || |
| 554 pending_nudge_->session != job.session) { | 552 pending_nudge_->session != job.session) { |
| 555 VLOG(1) << "SyncerThread(" << this << ")" << "Dropping a nudge in " | 553 SVLOG(2) << "Dropping a nudge in " |
| 556 << "DoSyncSessionJob because another nudge was scheduled"; | 554 << "DoSyncSessionJob because another nudge was scheduled"; |
| 557 return; // Another nudge must have been scheduled in in the meantime. | 555 return; // Another nudge must have been scheduled in in the meantime. |
| 558 } | 556 } |
| 559 pending_nudge_.reset(); | 557 pending_nudge_.reset(); |
| 560 | 558 |
| 561 // Create the session with the latest model safe table and use it to purge | 559 // Create the session with the latest model safe table and use it to purge |
| 562 // and update any disabled or modified entries in the job. | 560 // and update any disabled or modified entries in the job. |
| 563 scoped_ptr<SyncSession> session(CreateSyncSession(job.session->source())); | 561 scoped_ptr<SyncSession> session(CreateSyncSession(job.session->source())); |
| 564 | 562 |
| 565 job.session->RebaseRoutingInfoWithLatest(session.get()); | 563 job.session->RebaseRoutingInfoWithLatest(session.get()); |
| 566 } | 564 } |
| 567 VLOG(1) << "SyncerThread(" << this << ")" << " DoSyncSessionJob. job purpose " | 565 SVLOG(2) << "DoSyncSessionJob. job purpose " << job.purpose; |
| 568 << job.purpose; | |
| 569 | 566 |
| 570 SyncerStep begin(SYNCER_BEGIN); | 567 SyncerStep begin(SYNCER_BEGIN); |
| 571 SyncerStep end(SYNCER_END); | 568 SyncerStep end(SYNCER_END); |
| 572 SetSyncerStepsForPurpose(job.purpose, &begin, &end); | 569 SetSyncerStepsForPurpose(job.purpose, &begin, &end); |
| 573 | 570 |
| 574 bool has_more_to_sync = true; | 571 bool has_more_to_sync = true; |
| 575 while (ShouldRunJob(job) && has_more_to_sync) { | 572 while (ShouldRunJob(job) && has_more_to_sync) { |
| 576 VLOG(1) << "SyncerThread(" << this << ")" | 573 SVLOG(2) << "Calling SyncShare."; |
| 577 << " SyncerThread: Calling SyncShare."; | |
| 578 // Synchronously perform the sync session from this thread. | 574 // Synchronously perform the sync session from this thread. |
| 579 syncer_->SyncShare(job.session.get(), begin, end); | 575 syncer_->SyncShare(job.session.get(), begin, end); |
| 580 has_more_to_sync = job.session->HasMoreToSync(); | 576 has_more_to_sync = job.session->HasMoreToSync(); |
| 581 if (has_more_to_sync) | 577 if (has_more_to_sync) |
| 582 job.session->ResetTransientState(); | 578 job.session->ResetTransientState(); |
| 583 } | 579 } |
| 584 VLOG(1) << "SyncerThread(" << this << ")" | 580 SVLOG(2) << "Done SyncShare looping."; |
| 585 << " SyncerThread: Done SyncShare looping."; | |
| 586 FinishSyncSessionJob(job); | 581 FinishSyncSessionJob(job); |
| 587 } | 582 } |
| 588 | 583 |
| 589 void SyncerThread::UpdateCarryoverSessionState(const SyncSessionJob& old_job) { | 584 void SyncerThread::UpdateCarryoverSessionState(const SyncSessionJob& old_job) { |
| 590 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { | 585 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { |
| 591 // Whatever types were part of a configuration task will have had updates | 586 // Whatever types were part of a configuration task will have had updates |
| 592 // downloaded. For that reason, we make sure they get recorded in the | 587 // downloaded. For that reason, we make sure they get recorded in the |
| 593 // event that they get disabled at a later time. | 588 // event that they get disabled at a later time. |
| 594 ModelSafeRoutingInfo r(session_context_->previous_session_routing_info()); | 589 ModelSafeRoutingInfo r(session_context_->previous_session_routing_info()); |
| 595 if (!r.empty()) { | 590 if (!r.empty()) { |
| (...skipping 18 matching lines...) Expand all Loading... | |
| 614 for (iter = job.session->source().types.begin(); | 609 for (iter = job.session->source().types.begin(); |
| 615 iter != job.session->source().types.end(); | 610 iter != job.session->source().types.end(); |
| 616 ++iter) { | 611 ++iter) { |
| 617 syncable::PostTimeToTypeHistogram(iter->first, | 612 syncable::PostTimeToTypeHistogram(iter->first, |
| 618 now - last_sync_session_end_time_); | 613 now - last_sync_session_end_time_); |
| 619 } | 614 } |
| 620 } | 615 } |
| 621 last_sync_session_end_time_ = now; | 616 last_sync_session_end_time_ = now; |
| 622 UpdateCarryoverSessionState(job); | 617 UpdateCarryoverSessionState(job); |
| 623 if (IsSyncingCurrentlySilenced()) { | 618 if (IsSyncingCurrentlySilenced()) { |
| 624 VLOG(1) << "SyncerThread(" << this << ")" | 619 SVLOG(2) << " We are currently throttled. " |
|
Nicolas Zea
2011/06/07 19:55:24
" We..." -> "We..."
akalin
2011/06/07 20:17:38
Done.
| |
| 625 << " We are currently throttled. So not scheduling the next sync."; | 620 << "So not scheduling the next sync."; |
| 626 SaveJob(job); | 621 SaveJob(job); |
| 627 return; // Nothing to do. | 622 return; // Nothing to do. |
| 628 } | 623 } |
| 629 | 624 |
| 630 VLOG(1) << "SyncerThread(" << this << ")" | 625 SVLOG(2) << " Updating the next polling time after SyncMain"; |
|
Nicolas Zea
2011/06/07 19:55:24
" Updating... -> "Updating"
akalin
2011/06/07 20:17:38
Done.
| |
| 631 << " Updating the next polling time after SyncMain"; | |
| 632 ScheduleNextSync(job); | 626 ScheduleNextSync(job); |
| 633 } | 627 } |
| 634 | 628 |
| 635 void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) { | 629 void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) { |
| 636 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | 630 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| 637 DCHECK(!old_job.session->HasMoreToSync()); | 631 DCHECK(!old_job.session->HasMoreToSync()); |
| 638 // Note: |num_server_changes_remaining| > 0 here implies that we received a | 632 // Note: |num_server_changes_remaining| > 0 here implies that we received a |
| 639 // broken response while trying to download all updates, because the Syncer | 633 // broken response while trying to download all updates, because the Syncer |
| 640 // will loop until this value is exhausted. Also, if unsynced_handles exist | 634 // will loop until this value is exhausted. Also, if unsynced_handles exist |
| 641 // but HasMoreToSync is false, this implies that the Syncer determined no | 635 // but HasMoreToSync is false, this implies that the Syncer determined no |
| 642 // forward progress was possible at this time (an error, such as an HTTP | 636 // forward progress was possible at this time (an error, such as an HTTP |
| 643 // 500, is likely to have occurred during commit). | 637 // 500, is likely to have occurred during commit). |
| 638 int num_server_changes_remaining = | |
| 639 old_job.session->status_controller()->num_server_changes_remaining(); | |
| 640 size_t num_unsynced_handles = | |
| 641 old_job.session->status_controller()->unsynced_handles().size(); | |
| 644 const bool work_to_do = | 642 const bool work_to_do = |
| 645 old_job.session->status_controller()->num_server_changes_remaining() > 0 | 643 num_server_changes_remaining > 0 || num_unsynced_handles > 0; |
| 646 || old_job.session->status_controller()->unsynced_handles().size() > 0; | 644 SVLOG(2) << "num server changes remaining: " << num_server_changes_remaining |
| 647 VLOG(1) << "SyncerThread(" << this << ")" << " syncer has work to do: " | 645 << ", num unsynced handles: " << num_unsynced_handles |
| 648 << work_to_do; | 646 << ", syncer has work to do: " << work_to_do; |
| 649 | 647 |
| 650 AdjustPolling(&old_job); | 648 AdjustPolling(&old_job); |
| 651 | 649 |
| 652 // TODO(tim): Old impl had special code if notifications disabled. Needed? | 650 // TODO(tim): Old impl had special code if notifications disabled. Needed? |
| 653 if (!work_to_do) { | 651 if (!work_to_do) { |
| 654 // Success implies backoff relief. Note that if this was a "one-off" job | 652 // Success implies backoff relief. Note that if this was a "one-off" job |
| 655 // (i.e. purpose == SyncSessionJob::CLEAR_USER_DATA), if there was | 653 // (i.e. purpose == SyncSessionJob::CLEAR_USER_DATA), if there was |
| 656 // work_to_do before it ran this wont have changed, as jobs like this don't | 654 // work_to_do before it ran this wont have changed, as jobs like this don't |
| 657 // run a full sync cycle. So we don't need special code here. | 655 // run a full sync cycle. So we don't need special code here. |
| 658 wait_interval_.reset(); | 656 wait_interval_.reset(); |
| 659 VLOG(1) << "SyncerThread(" << this << ")" | 657 SVLOG(2) << " Job suceeded so not scheduling more jobs"; |
| 660 << " Job suceeded so not scheduling more jobs"; | |
| 661 return; | 658 return; |
| 662 } | 659 } |
| 663 | 660 |
| 664 if (old_job.session->source().updates_source == | 661 if (old_job.session->source().updates_source == |
| 665 GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION) { | 662 GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION) { |
| 666 VLOG(1) << "SyncerThread(" << this << ")" | 663 SVLOG(2) << "Job failed with source continuation"; |
| 667 << " Job failed with source continuation"; | |
| 668 // We don't seem to have made forward progress. Start or extend backoff. | 664 // We don't seem to have made forward progress. Start or extend backoff. |
| 669 HandleConsecutiveContinuationError(old_job); | 665 HandleConsecutiveContinuationError(old_job); |
| 670 } else if (IsBackingOff()) { | 666 } else if (IsBackingOff()) { |
| 671 VLOG(1) << "SyncerThread(" << this << ")" | 667 SVLOG(2) << "A nudge during backoff failed"; |
| 672 << " A nudge during backoff failed"; | |
| 673 // We weren't continuing but we're in backoff; must have been a nudge. | 668 // We weren't continuing but we're in backoff; must have been a nudge. |
| 674 DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose); | 669 DCHECK_EQ(SyncSessionJob::NUDGE, old_job.purpose); |
| 675 DCHECK(!wait_interval_->had_nudge); | 670 DCHECK(!wait_interval_->had_nudge); |
| 676 wait_interval_->had_nudge = true; | 671 wait_interval_->had_nudge = true; |
| 677 wait_interval_->timer.Reset(); | 672 wait_interval_->timer.Reset(); |
| 678 } else { | 673 } else { |
| 679 VLOG(1) << "SyncerThread(" << this << ")" | 674 SVLOG(2) << "Failed. Schedule a job with continuation as source"; |
| 680 << " Failed. Schedule a job with continuation as source"; | |
| 681 // We weren't continuing and we aren't in backoff. Schedule a normal | 675 // We weren't continuing and we aren't in backoff. Schedule a normal |
| 682 // continuation. | 676 // continuation. |
| 683 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { | 677 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { |
| 684 ScheduleConfigImpl(old_job.session->routing_info(), | 678 ScheduleConfigImpl(old_job.session->routing_info(), |
| 685 old_job.session->workers(), | 679 old_job.session->workers(), |
| 686 GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION)); | 680 GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION)); |
| 687 } else { | 681 } else { |
| 688 // For all other purposes(nudge and poll) we schedule a retry nudge. | 682 // For all other purposes(nudge and poll) we schedule a retry nudge. |
| 689 ScheduleNudgeImpl(TimeDelta::FromSeconds(0), | 683 ScheduleNudgeImpl(TimeDelta::FromSeconds(0), |
| 690 GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION), | 684 GetUpdatesFromNudgeSource(NUDGE_SOURCE_CONTINUATION), |
| (...skipping 27 matching lines...) Expand all Loading... | |
| 718 const SyncSessionJob& old_job) { | 712 const SyncSessionJob& old_job) { |
| 719 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | 713 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| 720 // This if conditions should be compiled out in retail builds. | 714 // This if conditions should be compiled out in retail builds. |
| 721 if (IsBackingOff()) { | 715 if (IsBackingOff()) { |
| 722 DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job); | 716 DCHECK(wait_interval_->timer.IsRunning() || old_job.is_canary_job); |
| 723 } | 717 } |
| 724 | 718 |
| 725 TimeDelta length = delay_provider_->GetDelay( | 719 TimeDelta length = delay_provider_->GetDelay( |
| 726 IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1)); | 720 IsBackingOff() ? wait_interval_->length : TimeDelta::FromSeconds(1)); |
| 727 | 721 |
| 728 VLOG(1) << "SyncerThread(" << this << ")" | 722 SVLOG(2) << "In handle continuation error. Old job purpose is " |
| 729 << " In handle continuation error. Old job purpose is " | 723 << old_job.purpose << " . The time delta(ms) is " |
| 730 << old_job.purpose << " . The time delta(ms) is " | 724 << length.InMilliseconds(); |
| 731 << length.InMilliseconds(); | |
| 732 | 725 |
| 733 // This will reset the had_nudge variable as well. | 726 // This will reset the had_nudge variable as well. |
| 734 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, | 727 wait_interval_.reset(new WaitInterval(WaitInterval::EXPONENTIAL_BACKOFF, |
| 735 length)); | 728 length)); |
| 736 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { | 729 if (old_job.purpose == SyncSessionJob::CONFIGURATION) { |
| 737 SyncSession* old = old_job.session.get(); | 730 SyncSession* old = old_job.session.get(); |
| 738 SyncSession* s(new SyncSession(session_context_.get(), this, | 731 SyncSession* s(new SyncSession(session_context_.get(), this, |
| 739 old->source(), old->routing_info(), old->workers())); | 732 old->source(), old->routing_info(), old->workers())); |
| 740 SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, | 733 SyncSessionJob job(old_job.purpose, TimeTicks::Now() + length, |
| 741 make_linked_ptr(s), false, FROM_HERE); | 734 make_linked_ptr(s), false, FROM_HERE); |
| (...skipping 27 matching lines...) Expand all Loading... | |
| 769 (rand_sign * (last_delay.InSeconds() / kBackoffRandomizationFactor)); | 762 (rand_sign * (last_delay.InSeconds() / kBackoffRandomizationFactor)); |
| 770 | 763 |
| 771 // Cap the backoff interval. | 764 // Cap the backoff interval. |
| 772 backoff_s = std::max(static_cast<int64>(1), | 765 backoff_s = std::max(static_cast<int64>(1), |
| 773 std::min(backoff_s, kMaxBackoffSeconds)); | 766 std::min(backoff_s, kMaxBackoffSeconds)); |
| 774 | 767 |
| 775 return TimeDelta::FromSeconds(backoff_s); | 768 return TimeDelta::FromSeconds(backoff_s); |
| 776 } | 769 } |
| 777 | 770 |
| 778 void SyncerThread::Stop() { | 771 void SyncerThread::Stop() { |
| 779 VLOG(1) << "SyncerThread(" << this << ")" << " stop called"; | 772 SVLOG(2) << "stop called"; |
| 780 syncer_->RequestEarlyExit(); // Safe to call from any thread. | 773 syncer_->RequestEarlyExit(); // Safe to call from any thread. |
| 781 session_context_->connection_manager()->RemoveListener(this); | 774 session_context_->connection_manager()->RemoveListener(this); |
| 782 thread_.Stop(); | 775 thread_.Stop(); |
| 783 } | 776 } |
| 784 | 777 |
| 785 void SyncerThread::DoCanaryJob() { | 778 void SyncerThread::DoCanaryJob() { |
| 786 VLOG(1) << "SyncerThread(" << this << ")" << " Do canary job"; | 779 SVLOG(2) << "Do canary job"; |
| 787 DoPendingJobIfPossible(true); | 780 DoPendingJobIfPossible(true); |
| 788 } | 781 } |
| 789 | 782 |
| 790 void SyncerThread::DoPendingJobIfPossible(bool is_canary_job) { | 783 void SyncerThread::DoPendingJobIfPossible(bool is_canary_job) { |
| 791 SyncSessionJob* job_to_execute = NULL; | 784 SyncSessionJob* job_to_execute = NULL; |
| 792 if (mode_ == CONFIGURATION_MODE && wait_interval_.get() | 785 if (mode_ == CONFIGURATION_MODE && wait_interval_.get() |
| 793 && wait_interval_->pending_configure_job.get()) { | 786 && wait_interval_->pending_configure_job.get()) { |
| 794 VLOG(1) << "SyncerThread(" << this << ")" << " Found pending configure job"; | 787 SVLOG(2) << "Found pending configure job"; |
| 795 job_to_execute = wait_interval_->pending_configure_job.get(); | 788 job_to_execute = wait_interval_->pending_configure_job.get(); |
| 796 } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) { | 789 } else if (mode_ == NORMAL_MODE && pending_nudge_.get()) { |
| 797 VLOG(1) << "SyncerThread(" << this << ")" << " Found pending nudge job"; | 790 SVLOG(2) << "Found pending nudge job"; |
| 798 // Pending jobs mostly have time from the past. Reset it so this job | 791 // Pending jobs mostly have time from the past. Reset it so this job |
| 799 // will get executed. | 792 // will get executed. |
| 800 if (pending_nudge_->scheduled_start < TimeTicks::Now()) | 793 if (pending_nudge_->scheduled_start < TimeTicks::Now()) |
| 801 pending_nudge_->scheduled_start = TimeTicks::Now(); | 794 pending_nudge_->scheduled_start = TimeTicks::Now(); |
| 802 | 795 |
| 803 scoped_ptr<SyncSession> session(CreateSyncSession( | 796 scoped_ptr<SyncSession> session(CreateSyncSession( |
| 804 pending_nudge_->session->source())); | 797 pending_nudge_->session->source())); |
| 805 | 798 |
| 806 // Also the routing info might have been changed since we cached the | 799 // Also the routing info might have been changed since we cached the |
| 807 // pending nudge. Update it by coalescing to the latest. | 800 // pending nudge. Update it by coalescing to the latest. |
| 808 pending_nudge_->session->Coalesce(*(session.get())); | 801 pending_nudge_->session->Coalesce(*(session.get())); |
| 809 // The pending nudge would be cleared in the DoSyncSessionJob function. | 802 // The pending nudge would be cleared in the DoSyncSessionJob function. |
| 810 job_to_execute = pending_nudge_.get(); | 803 job_to_execute = pending_nudge_.get(); |
| 811 } | 804 } |
| 812 | 805 |
| 813 if (job_to_execute != NULL) { | 806 if (job_to_execute != NULL) { |
| 814 VLOG(1) << "SyncerThread(" << this << ")" << " Executing pending job"; | 807 SVLOG(2) << "Executing pending job"; |
| 815 SyncSessionJob copy = *job_to_execute; | 808 SyncSessionJob copy = *job_to_execute; |
| 816 copy.is_canary_job = is_canary_job; | 809 copy.is_canary_job = is_canary_job; |
| 817 DoSyncSessionJob(copy); | 810 DoSyncSessionJob(copy); |
| 818 } | 811 } |
| 819 } | 812 } |
| 820 | 813 |
| 821 SyncSession* SyncerThread::CreateSyncSession(const SyncSourceInfo& source) { | 814 SyncSession* SyncerThread::CreateSyncSession(const SyncSourceInfo& source) { |
| 822 ModelSafeRoutingInfo routes; | 815 ModelSafeRoutingInfo routes; |
| 823 std::vector<ModelSafeWorker*> workers; | 816 std::vector<ModelSafeWorker*> workers; |
| 824 session_context_->registrar()->GetModelSafeRoutingInfo(&routes); | 817 session_context_->registrar()->GetModelSafeRoutingInfo(&routes); |
| (...skipping 12 matching lines...) Expand all Loading... | |
| 837 ModelTypePayloadMap types_with_payloads = | 830 ModelTypePayloadMap types_with_payloads = |
| 838 syncable::ModelTypePayloadMapFromRoutingInfo(r, std::string()); | 831 syncable::ModelTypePayloadMapFromRoutingInfo(r, std::string()); |
| 839 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads); | 832 SyncSourceInfo info(GetUpdatesCallerInfo::PERIODIC, types_with_payloads); |
| 840 SyncSession* s = CreateSyncSession(info); | 833 SyncSession* s = CreateSyncSession(info); |
| 841 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s, | 834 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), SyncSessionJob::POLL, s, |
| 842 FROM_HERE); | 835 FROM_HERE); |
| 843 } | 836 } |
| 844 | 837 |
| 845 void SyncerThread::Unthrottle() { | 838 void SyncerThread::Unthrottle() { |
| 846 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); | 839 DCHECK_EQ(WaitInterval::THROTTLED, wait_interval_->mode); |
| 847 VLOG(1) << "SyncerThread(" << this << ")" << " Unthrottled.."; | 840 SVLOG(2) << "Unthrottled."; |
| 848 DoCanaryJob(); | 841 DoCanaryJob(); |
| 849 wait_interval_.reset(); | 842 wait_interval_.reset(); |
| 850 } | 843 } |
| 851 | 844 |
| 852 void SyncerThread::Notify(SyncEngineEvent::EventCause cause) { | 845 void SyncerThread::Notify(SyncEngineEvent::EventCause cause) { |
| 853 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | 846 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| 854 session_context_->NotifyListeners(SyncEngineEvent(cause)); | 847 session_context_->NotifyListeners(SyncEngineEvent(cause)); |
| 855 } | 848 } |
| 856 | 849 |
| 857 bool SyncerThread::IsBackingOff() const { | 850 bool SyncerThread::IsBackingOff() const { |
| (...skipping 19 matching lines...) Expand all Loading... | |
| 877 syncer_short_poll_interval_seconds_ = new_interval; | 870 syncer_short_poll_interval_seconds_ = new_interval; |
| 878 } | 871 } |
| 879 | 872 |
| 880 void SyncerThread::OnReceivedLongPollIntervalUpdate( | 873 void SyncerThread::OnReceivedLongPollIntervalUpdate( |
| 881 const base::TimeDelta& new_interval) { | 874 const base::TimeDelta& new_interval) { |
| 882 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); | 875 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); |
| 883 syncer_long_poll_interval_seconds_ = new_interval; | 876 syncer_long_poll_interval_seconds_ = new_interval; |
| 884 } | 877 } |
| 885 | 878 |
| 886 void SyncerThread::OnShouldStopSyncingPermanently() { | 879 void SyncerThread::OnShouldStopSyncingPermanently() { |
| 887 VLOG(1) << "SyncerThread(" << this << ")" | 880 SVLOG(2) << "OnShouldStopSyncingPermanently"; |
| 888 << " OnShouldStopSyncingPermanently"; | |
| 889 syncer_->RequestEarlyExit(); // Thread-safe. | 881 syncer_->RequestEarlyExit(); // Thread-safe. |
| 890 Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); | 882 Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); |
| 891 } | 883 } |
| 892 | 884 |
| 893 void SyncerThread::OnServerConnectionEvent( | 885 void SyncerThread::OnServerConnectionEvent( |
| 894 const ServerConnectionEvent& event) { | 886 const ServerConnectionEvent& event) { |
| 895 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this, | 887 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this, |
| 896 &SyncerThread::CheckServerConnectionManagerStatus, | 888 &SyncerThread::CheckServerConnectionManagerStatus, |
| 897 event.connection_code)); | 889 event.connection_code)); |
| 898 } | 890 } |
| 899 | 891 |
| 900 void SyncerThread::set_notifications_enabled(bool notifications_enabled) { | 892 void SyncerThread::set_notifications_enabled(bool notifications_enabled) { |
| 901 session_context_->set_notifications_enabled(notifications_enabled); | 893 session_context_->set_notifications_enabled(notifications_enabled); |
| 902 } | 894 } |
| 903 | 895 |
| 896 #undef SVLOG | |
| 897 | |
| 904 } // browser_sync | 898 } // browser_sync |
| OLD | NEW |