Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(28)

Side by Side Diff: chrome/browser/sync/engine/syncer_thread2.cc

Issue 6690020: sync: hook up ServerConnectionManager <> SyncerThread2 and tie up more loose ends (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src/chrome/debug
Patch Set: rebase Created 9 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "chrome/browser/sync/engine/syncer_thread2.h" 5 #include "chrome/browser/sync/engine/syncer_thread2.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 8
9 #include "base/rand_util.h" 9 #include "base/rand_util.h"
10 #include "chrome/browser/sync/engine/syncer.h" 10 #include "chrome/browser/sync/engine/syncer.h"
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after
71 server_connection_ok_(false), 71 server_connection_ok_(false),
72 delay_provider_(new DelayProvider()), 72 delay_provider_(new DelayProvider()),
73 syncer_(syncer), 73 syncer_(syncer),
74 session_context_(context) { 74 session_context_(context) {
75 } 75 }
76 76
77 SyncerThread::~SyncerThread() { 77 SyncerThread::~SyncerThread() {
78 DCHECK(!thread_.IsRunning()); 78 DCHECK(!thread_.IsRunning());
79 } 79 }
80 80
81 void SyncerThread::Start(Mode mode) { 81 void SyncerThread::CheckServerConnectionManagerStatus(
82 if (!thread_.IsRunning() && !thread_.Start()) { 82 HttpResponse::ServerConnectionCode code) {
83 NOTREACHED() << "Unable to start SyncerThread."; 83 // Note, be careful when adding cases here because if the SyncerThread
84 return; 84 // thinks there is no valid connection as determined by this method, it
85 // will drop out of *all* forward progress sync loops (it won't poll and it
86 // will queue up Talk notifications but not actually call SyncShare) until
87 // some external action causes a ServerConnectionManager to broadcast that
88 // a valid connection has been re-established.
89 if (HttpResponse::CONNECTION_UNAVAILABLE == code ||
90 HttpResponse::SYNC_AUTH_ERROR == code) {
91 server_connection_ok_ = false;
92 } else if (HttpResponse::SERVER_CONNECTION_OK == code) {
93 server_connection_ok_ = true;
lipalani1 2011/03/25 23:18:30 This seems to be a little odd. There is no catch a
tim (not reviewing) 2011/03/27 19:23:19 We surface both 'reachable' and 'server up' and 's
94 }
95 }
96
97 void SyncerThread::Start(Mode mode, ModeChangeCallback* callback) {
lipalani1 2011/03/25 23:18:30 I am assuming start can be called from only one th
tim (not reviewing) 2011/03/27 19:23:19 Right. We can't easily add a DCHECK though since
98 if (!thread_.IsRunning()) {
99 if (!thread_.Start()) {
100 NOTREACHED() << "Unable to start SyncerThread.";
101 return;
102 }
103 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
104 this, &SyncerThread::WatchConnectionManager));
105 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
106 this, &SyncerThread::SendInitialSnapshot));
85 } 107 }
86 108
87 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( 109 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
88 this, &SyncerThread::StartImpl, mode)); 110 this, &SyncerThread::StartImpl, mode, make_linked_ptr(callback)));
89 } 111 }
90 112
91 void SyncerThread::StartImpl(Mode mode) { 113 void SyncerThread::SendInitialSnapshot() {
114 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
115 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_.get(), this,
116 SyncSourceInfo(), ModelSafeRoutingInfo(),
117 std::vector<ModelSafeWorker*>()));
118 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED);
119 sessions::SyncSessionSnapshot snapshot(dummy->TakeSnapshot());
120 event.snapshot = &snapshot;
121 session_context_->NotifyListeners(event);
122 }
123
124 void SyncerThread::WatchConnectionManager() {
125 ServerConnectionManager* scm = session_context_->connection_manager();
126 CheckServerConnectionManagerStatus(scm->server_status());
lipalani1 2011/03/25 23:18:30 The name(CheckServerConnectionManagerStatus) seems
tim (not reviewing) 2011/03/27 19:23:19 It checks the passed-in value to figure out what t
127 scm->AddListener(this);
128 }
129
130 void SyncerThread::StartImpl(Mode mode,
131 linked_ptr<ModeChangeCallback> callback) {
92 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 132 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
93 DCHECK(!session_context_->account_name().empty()); 133 DCHECK(!session_context_->account_name().empty());
94 DCHECK(syncer_.get()); 134 DCHECK(syncer_.get());
95 mode_ = mode; 135 mode_ = mode;
96 AdjustPolling(NULL); // Will kick start poll timer if needed. 136 AdjustPolling(NULL); // Will kick start poll timer if needed.
137 if (callback.get())
138 callback->Run();
lipalani1 2011/03/25 23:18:30 This callback seems to be never used? what is the
tim (not reviewing) 2011/03/27 19:23:19 See StartConfigurationMode in syncapi -- we use th
97 } 139 }
98 140
99 bool SyncerThread::ShouldRunJob(SyncSessionJobPurpose purpose, 141 bool SyncerThread::ShouldRunJob(SyncSessionJobPurpose purpose,
100 const TimeTicks& scheduled_start) { 142 const TimeTicks& scheduled_start) {
101 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 143 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
102 144
103 // Check wait interval. 145 // Check wait interval.
104 if (wait_interval_.get()) { 146 if (wait_interval_.get()) {
105 // TODO(tim): Consider different handling for CLEAR_USER_DATA (i.e. permit 147 // TODO(tim): Consider different handling for CLEAR_USER_DATA (i.e. permit
106 // when throttled). 148 // when throttled).
(...skipping 107 matching lines...) Expand 10 before | Expand all | Expand 10 after
214 SyncSession* session = new SyncSession(session_context_.get(), this, 256 SyncSession* session = new SyncSession(session_context_.get(), this,
215 SyncSourceInfo(), ModelSafeRoutingInfo(), 257 SyncSourceInfo(), ModelSafeRoutingInfo(),
216 std::vector<ModelSafeWorker*>()); 258 std::vector<ModelSafeWorker*>());
217 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), CLEAR_USER_DATA, session); 259 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), CLEAR_USER_DATA, session);
218 } 260 }
219 261
220 void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay, 262 void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay,
221 NudgeSource source, const ModelTypePayloadMap& types_with_payloads) { 263 NudgeSource source, const ModelTypePayloadMap& types_with_payloads) {
222 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 264 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
223 TimeTicks rough_start = TimeTicks::Now() + delay; 265 TimeTicks rough_start = TimeTicks::Now() + delay;
266 if (!ShouldRunJob(NUDGE, rough_start)) {
267 LOG(WARNING) << "Dropping nudge at scheduling time, source = "
268 << source;
269 return;
270 }
lipalani1 2011/03/25 23:18:30 Is this needed here? When we run the job we check
tim (not reviewing) 2011/03/27 19:23:19 I originally thought the same, but it turns out it
224 271
225 // Note we currently nudge for all types regardless of the ones incurring 272 // Note we currently nudge for all types regardless of the ones incurring
226 // the nudge. Doing different would throw off some syncer commands like 273 // the nudge. Doing different would throw off some syncer commands like
227 // CleanupDisabledTypes. We may want to change this in the future. 274 // CleanupDisabledTypes. We may want to change this in the future.
228 ModelSafeRoutingInfo routes; 275 ModelSafeRoutingInfo routes;
229 std::vector<ModelSafeWorker*> workers; 276 std::vector<ModelSafeWorker*> workers;
230 session_context_->registrar()->GetModelSafeRoutingInfo(&routes); 277 session_context_->registrar()->GetModelSafeRoutingInfo(&routes);
231 session_context_->registrar()->GetWorkers(&workers); 278 session_context_->registrar()->GetWorkers(&workers);
232 SyncSourceInfo info(GetUpdatesFromNudgeSource(source), 279 SyncSourceInfo info(GetUpdatesFromNudgeSource(source),
233 types_with_payloads); 280 types_with_payloads);
(...skipping 108 matching lines...) Expand 10 before | Expand all | Expand 10 after
342 case POLL: 389 case POLL:
343 *start = SYNCER_BEGIN; 390 *start = SYNCER_BEGIN;
344 return; 391 return;
345 default: 392 default:
346 NOTREACHED(); 393 NOTREACHED();
347 } 394 }
348 } 395 }
349 396
350 void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) { 397 void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) {
351 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 398 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
399 if (!ShouldRunJob(job.purpose, job.scheduled_start))
400 return;
lipalani1 2011/03/25 23:18:30 We need some kind of logging here. I am assuming t
tim (not reviewing) 2011/03/27 19:23:19 good question -- see bug 71616. We currently do d
352 401
353 if (job.purpose == NUDGE) { 402 if (job.purpose == NUDGE) {
354 DCHECK(pending_nudge_.get()); 403 DCHECK(pending_nudge_.get());
355 if (pending_nudge_->session != job.session) 404 if (pending_nudge_->session != job.session)
356 return; // Another nudge must have been scheduled in in the meantime. 405 return; // Another nudge must have been scheduled in in the meantime.
357 pending_nudge_.reset(); 406 pending_nudge_.reset();
358 } 407 }
359 408
360 SyncerStep begin(SYNCER_BEGIN); 409 SyncerStep begin(SYNCER_BEGIN);
361 SyncerStep end(SYNCER_END); 410 SyncerStep end(SYNCER_END);
362 SetSyncerStepsForPurpose(job.purpose, &begin, &end); 411 SetSyncerStepsForPurpose(job.purpose, &begin, &end);
363 412
364 bool has_more_to_sync = true; 413 bool has_more_to_sync = true;
365 bool did_job = false;
366 while (ShouldRunJob(job.purpose, job.scheduled_start) && has_more_to_sync) { 414 while (ShouldRunJob(job.purpose, job.scheduled_start) && has_more_to_sync) {
367 VLOG(1) << "SyncerThread: Calling SyncShare."; 415 VLOG(1) << "SyncerThread: Calling SyncShare.";
368 did_job = true;
369 // Synchronously perform the sync session from this thread. 416 // Synchronously perform the sync session from this thread.
370 syncer_->SyncShare(job.session.get(), begin, end); 417 syncer_->SyncShare(job.session.get(), begin, end);
371 has_more_to_sync = job.session->HasMoreToSync(); 418 has_more_to_sync = job.session->HasMoreToSync();
372 if (has_more_to_sync) 419 if (has_more_to_sync)
373 job.session->ResetTransientState(); 420 job.session->ResetTransientState();
374 } 421 }
375 VLOG(1) << "SyncerThread: Done SyncShare looping."; 422 VLOG(1) << "SyncerThread: Done SyncShare looping.";
376 if (did_job) 423 FinishSyncSessionJob(job);
377 FinishSyncSessionJob(job); 424 }
425
426 void SyncerThread::UpdateCarryoverSessionState(const SyncSessionJob& old_job) {
427 if (old_job.purpose == CONFIGURATION) {
428 // Whatever types were part of a configuration task will have had updates
429 // downloaded. For that reason, we make sure they get recorded in the
430 // event that they get disabled at a later time.
431 ModelSafeRoutingInfo r(session_context_->previous_session_routing_info());
432 if (!r.empty()) {
433 ModelSafeRoutingInfo temp_r;
434 ModelSafeRoutingInfo old_info(old_job.session->routing_info());
435 std::set_union(r.begin(), r.end(), old_info.begin(), old_info.end(),
436 std::insert_iterator<ModelSafeRoutingInfo>(temp_r, temp_r.begin()));
437 session_context_->set_previous_session_routing_info(temp_r);
438 }
439 } else {
440 session_context_->set_previous_session_routing_info(
441 old_job.session->routing_info());
442 }
378 } 443 }
379 444
380 void SyncerThread::FinishSyncSessionJob(const SyncSessionJob& job) { 445 void SyncerThread::FinishSyncSessionJob(const SyncSessionJob& job) {
381 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 446 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
382 // Update timing information for how often datatypes are triggering nudges. 447 // Update timing information for how often datatypes are triggering nudges.
383 base::TimeTicks now = TimeTicks::Now(); 448 base::TimeTicks now = TimeTicks::Now();
384 if (!last_sync_session_end_time_.is_null()) { 449 if (!last_sync_session_end_time_.is_null()) {
385 ModelTypePayloadMap::const_iterator iter; 450 ModelTypePayloadMap::const_iterator iter;
386 for (iter = job.session->source().types.begin(); 451 for (iter = job.session->source().types.begin();
387 iter != job.session->source().types.end(); 452 iter != job.session->source().types.end();
388 ++iter) { 453 ++iter) {
389 syncable::PostTimeToTypeHistogram(iter->first, 454 syncable::PostTimeToTypeHistogram(iter->first,
390 now - last_sync_session_end_time_); 455 now - last_sync_session_end_time_);
391 } 456 }
392 } 457 }
393 last_sync_session_end_time_ = now; 458 last_sync_session_end_time_ = now;
459 UpdateCarryoverSessionState(job);
394 if (IsSyncingCurrentlySilenced()) 460 if (IsSyncingCurrentlySilenced())
395 return; // Nothing to do. 461 return; // Nothing to do.
396 462
397 VLOG(1) << "Updating the next polling time after SyncMain"; 463 VLOG(1) << "Updating the next polling time after SyncMain";
398 ScheduleNextSync(job); 464 ScheduleNextSync(job);
399 } 465 }
400 466
401 void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) { 467 void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) {
402 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 468 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
403 DCHECK(!old_job.session->HasMoreToSync()); 469 DCHECK(!old_job.session->HasMoreToSync());
(...skipping 95 matching lines...) Expand 10 before | Expand all | Expand 10 after
499 565
500 // Cap the backoff interval. 566 // Cap the backoff interval.
501 backoff_s = std::max(static_cast<int64>(1), 567 backoff_s = std::max(static_cast<int64>(1),
502 std::min(backoff_s, kMaxBackoffSeconds)); 568 std::min(backoff_s, kMaxBackoffSeconds));
503 569
504 return TimeDelta::FromSeconds(backoff_s); 570 return TimeDelta::FromSeconds(backoff_s);
505 } 571 }
506 572
507 void SyncerThread::Stop() { 573 void SyncerThread::Stop() {
508 syncer_->RequestEarlyExit(); // Safe to call from any thread. 574 syncer_->RequestEarlyExit(); // Safe to call from any thread.
575 session_context_->connection_manager()->RemoveListener(this);
509 thread_.Stop(); 576 thread_.Stop();
510 Notify(SyncEngineEvent::SYNCER_THREAD_EXITING);
511 } 577 }
512 578
513 void SyncerThread::DoCanaryJob() { 579 void SyncerThread::DoCanaryJob() {
514 DCHECK(pending_nudge_.get()); 580 DCHECK(pending_nudge_.get());
515 wait_interval_->had_nudge = false; 581 wait_interval_->had_nudge = false;
516 SyncSessionJob copy = {pending_nudge_->purpose, 582 SyncSessionJob copy = {pending_nudge_->purpose,
517 pending_nudge_->scheduled_start, 583 pending_nudge_->scheduled_start,
518 pending_nudge_->session}; 584 pending_nudge_->session};
519 DoSyncSessionJob(copy); 585 DoSyncSessionJob(copy);
520 } 586 }
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
570 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 636 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
571 syncer_long_poll_interval_seconds_ = new_interval; 637 syncer_long_poll_interval_seconds_ = new_interval;
572 } 638 }
573 639
574 void SyncerThread::OnShouldStopSyncingPermanently() { 640 void SyncerThread::OnShouldStopSyncingPermanently() {
575 syncer_->RequestEarlyExit(); // Thread-safe. 641 syncer_->RequestEarlyExit(); // Thread-safe.
576 Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); 642 Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY);
577 } 643 }
578 644
579 void SyncerThread::OnServerConnectionEvent( 645 void SyncerThread::OnServerConnectionEvent(
580 const ServerConnectionEvent& event) { 646 const ServerConnectionEvent2& event) {
581 NOTIMPLEMENTED(); 647 CheckServerConnectionManagerStatus(event.connection_code);
582 } 648 }
583 649
584 void SyncerThread::set_notifications_enabled(bool notifications_enabled) { 650 void SyncerThread::set_notifications_enabled(bool notifications_enabled) {
585 session_context_->set_notifications_enabled(notifications_enabled); 651 session_context_->set_notifications_enabled(notifications_enabled);
586 } 652 }
587 653
588 } // s3 654 } // s3
589 } // browser_sync 655 } // browser_sync
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698