Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(198)

Side by Side Diff: chrome/browser/sync/engine/syncer_thread2.cc

Issue 6690020: sync: hook up ServerConnectionManager <> SyncerThread2 and tie up more loose ends (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src/chrome/debug
Patch Set: fix Created 9 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "chrome/browser/sync/engine/syncer_thread2.h" 5 #include "chrome/browser/sync/engine/syncer_thread2.h"
6 6
7 #include <algorithm> 7 #include <algorithm>
8 8
9 #include "base/rand_util.h" 9 #include "base/rand_util.h"
10 #include "chrome/browser/sync/engine/syncer.h" 10 #include "chrome/browser/sync/engine/syncer.h"
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after
71 server_connection_ok_(false), 71 server_connection_ok_(false),
72 delay_provider_(new DelayProvider()), 72 delay_provider_(new DelayProvider()),
73 syncer_(syncer), 73 syncer_(syncer),
74 session_context_(context) { 74 session_context_(context) {
75 } 75 }
76 76
77 SyncerThread::~SyncerThread() { 77 SyncerThread::~SyncerThread() {
78 DCHECK(!thread_.IsRunning()); 78 DCHECK(!thread_.IsRunning());
79 } 79 }
80 80
81 void SyncerThread::Start(Mode mode) { 81 void SyncerThread::CheckServerConnectionManagerStatus(
82 if (!thread_.IsRunning() && !thread_.Start()) { 82 HttpResponse::ServerConnectionCode code) {
83 NOTREACHED() << "Unable to start SyncerThread."; 83 // Note, be careful when adding cases here because if the SyncerThread
84 return; 84 // thinks there is no valid connection as determined by this method, it
85 // will drop out of *all* forward progress sync loops (it won't poll and it
86 // will queue up Talk notifications but not actually call SyncShare) until
87 // some external action causes a ServerConnectionManager to broadcast that
88 // a valid connection has been re-established.
89 if (HttpResponse::CONNECTION_UNAVAILABLE == code ||
90 HttpResponse::SYNC_AUTH_ERROR == code) {
91 server_connection_ok_ = false;
92 } else if (HttpResponse::SERVER_CONNECTION_OK == code) {
93 server_connection_ok_ = true;
94 }
95 }
96
97 void SyncerThread::Start(Mode mode, ModeChangeCallback* callback) {
98 if (!thread_.IsRunning()) {
99 if (!thread_.Start()) {
100 NOTREACHED() << "Unable to start SyncerThread.";
101 return;
102 }
103 WatchConnectionManager();
104 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
105 this, &SyncerThread::SendInitialSnapshot));
85 } 106 }
86 107
87 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod( 108 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(
88 this, &SyncerThread::StartImpl, mode)); 109 this, &SyncerThread::StartImpl, mode, make_linked_ptr(callback)));
89 } 110 }
90 111
91 void SyncerThread::StartImpl(Mode mode) { 112 void SyncerThread::SendInitialSnapshot() {
113 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
114 scoped_ptr<SyncSession> dummy(new SyncSession(session_context_.get(), this,
115 SyncSourceInfo(), ModelSafeRoutingInfo(),
116 std::vector<ModelSafeWorker*>()));
117 SyncEngineEvent event(SyncEngineEvent::STATUS_CHANGED);
118 sessions::SyncSessionSnapshot snapshot(dummy->TakeSnapshot());
119 event.snapshot = &snapshot;
120 session_context_->NotifyListeners(event);
121 }
122
123 void SyncerThread::WatchConnectionManager() {
124 ServerConnectionManager* scm = session_context_->connection_manager();
125 CheckServerConnectionManagerStatus(scm->server_status());
126 scm->AddListener(this);
127 }
128
129 void SyncerThread::StartImpl(Mode mode,
130 linked_ptr<ModeChangeCallback> callback) {
92 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 131 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
93 DCHECK(!session_context_->account_name().empty()); 132 DCHECK(!session_context_->account_name().empty());
94 DCHECK(syncer_.get()); 133 DCHECK(syncer_.get());
95 mode_ = mode; 134 mode_ = mode;
96 AdjustPolling(NULL); // Will kick start poll timer if needed. 135 AdjustPolling(NULL); // Will kick start poll timer if needed.
136 if (callback.get())
137 callback->Run();
97 } 138 }
98 139
99 bool SyncerThread::ShouldRunJob(SyncSessionJobPurpose purpose, 140 bool SyncerThread::ShouldRunJob(SyncSessionJobPurpose purpose,
100 const TimeTicks& scheduled_start) { 141 const TimeTicks& scheduled_start) {
101 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 142 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
102 143
103 // Check wait interval. 144 // Check wait interval.
104 if (wait_interval_.get()) { 145 if (wait_interval_.get()) {
105 // TODO(tim): Consider different handling for CLEAR_USER_DATA (i.e. permit 146 // TODO(tim): Consider different handling for CLEAR_USER_DATA (i.e. permit
106 // when throttled). 147 // when throttled).
(...skipping 107 matching lines...) Expand 10 before | Expand all | Expand 10 after
214 SyncSession* session = new SyncSession(session_context_.get(), this, 255 SyncSession* session = new SyncSession(session_context_.get(), this,
215 SyncSourceInfo(), ModelSafeRoutingInfo(), 256 SyncSourceInfo(), ModelSafeRoutingInfo(),
216 std::vector<ModelSafeWorker*>()); 257 std::vector<ModelSafeWorker*>());
217 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), CLEAR_USER_DATA, session); 258 ScheduleSyncSessionJob(TimeDelta::FromSeconds(0), CLEAR_USER_DATA, session);
218 } 259 }
219 260
220 void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay, 261 void SyncerThread::ScheduleNudgeImpl(const TimeDelta& delay,
221 NudgeSource source, const ModelTypePayloadMap& types_with_payloads) { 262 NudgeSource source, const ModelTypePayloadMap& types_with_payloads) {
222 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 263 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
223 TimeTicks rough_start = TimeTicks::Now() + delay; 264 TimeTicks rough_start = TimeTicks::Now() + delay;
265 if (!ShouldRunJob(NUDGE, rough_start)) {
266 LOG(WARNING) << "Dropping nudge at scheduling time, source = "
267 << source;
268 return;
269 }
224 270
225 // Note we currently nudge for all types regardless of the ones incurring 271 // Note we currently nudge for all types regardless of the ones incurring
226 // the nudge. Doing different would throw off some syncer commands like 272 // the nudge. Doing different would throw off some syncer commands like
227 // CleanupDisabledTypes. We may want to change this in the future. 273 // CleanupDisabledTypes. We may want to change this in the future.
228 ModelSafeRoutingInfo routes; 274 ModelSafeRoutingInfo routes;
229 std::vector<ModelSafeWorker*> workers; 275 std::vector<ModelSafeWorker*> workers;
230 session_context_->registrar()->GetModelSafeRoutingInfo(&routes); 276 session_context_->registrar()->GetModelSafeRoutingInfo(&routes);
231 session_context_->registrar()->GetWorkers(&workers); 277 session_context_->registrar()->GetWorkers(&workers);
232 SyncSourceInfo info(GetUpdatesFromNudgeSource(source), 278 SyncSourceInfo info(GetUpdatesFromNudgeSource(source),
233 types_with_payloads); 279 types_with_payloads);
(...skipping 108 matching lines...) Expand 10 before | Expand all | Expand 10 after
342 case POLL: 388 case POLL:
343 *start = SYNCER_BEGIN; 389 *start = SYNCER_BEGIN;
344 return; 390 return;
345 default: 391 default:
346 NOTREACHED(); 392 NOTREACHED();
347 } 393 }
348 } 394 }
349 395
350 void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) { 396 void SyncerThread::DoSyncSessionJob(const SyncSessionJob& job) {
351 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 397 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
398 if (!ShouldRunJob(job.purpose, job.scheduled_start)) {
399 LOG(WARNING) << "Dropping nudge at DoSyncSessionJob, source = "
400 << job.session->source().updates_source;
401 return;
402 }
352 403
353 if (job.purpose == NUDGE) { 404 if (job.purpose == NUDGE) {
354 DCHECK(pending_nudge_.get()); 405 DCHECK(pending_nudge_.get());
355 if (pending_nudge_->session != job.session) 406 if (pending_nudge_->session != job.session)
356 return; // Another nudge must have been scheduled in in the meantime. 407 return; // Another nudge must have been scheduled in in the meantime.
357 pending_nudge_.reset(); 408 pending_nudge_.reset();
358 } 409 }
359 410
360 SyncerStep begin(SYNCER_BEGIN); 411 SyncerStep begin(SYNCER_BEGIN);
361 SyncerStep end(SYNCER_END); 412 SyncerStep end(SYNCER_END);
362 SetSyncerStepsForPurpose(job.purpose, &begin, &end); 413 SetSyncerStepsForPurpose(job.purpose, &begin, &end);
363 414
364 bool has_more_to_sync = true; 415 bool has_more_to_sync = true;
365 bool did_job = false;
366 while (ShouldRunJob(job.purpose, job.scheduled_start) && has_more_to_sync) { 416 while (ShouldRunJob(job.purpose, job.scheduled_start) && has_more_to_sync) {
367 VLOG(1) << "SyncerThread: Calling SyncShare."; 417 VLOG(1) << "SyncerThread: Calling SyncShare.";
368 did_job = true;
369 // Synchronously perform the sync session from this thread. 418 // Synchronously perform the sync session from this thread.
370 syncer_->SyncShare(job.session.get(), begin, end); 419 syncer_->SyncShare(job.session.get(), begin, end);
371 has_more_to_sync = job.session->HasMoreToSync(); 420 has_more_to_sync = job.session->HasMoreToSync();
372 if (has_more_to_sync) 421 if (has_more_to_sync)
373 job.session->ResetTransientState(); 422 job.session->ResetTransientState();
374 } 423 }
375 VLOG(1) << "SyncerThread: Done SyncShare looping."; 424 VLOG(1) << "SyncerThread: Done SyncShare looping.";
376 if (did_job) 425 FinishSyncSessionJob(job);
377 FinishSyncSessionJob(job); 426 }
427
428 void SyncerThread::UpdateCarryoverSessionState(const SyncSessionJob& old_job) {
429 if (old_job.purpose == CONFIGURATION) {
430 // Whatever types were part of a configuration task will have had updates
431 // downloaded. For that reason, we make sure they get recorded in the
432 // event that they get disabled at a later time.
433 ModelSafeRoutingInfo r(session_context_->previous_session_routing_info());
434 if (!r.empty()) {
435 ModelSafeRoutingInfo temp_r;
436 ModelSafeRoutingInfo old_info(old_job.session->routing_info());
437 std::set_union(r.begin(), r.end(), old_info.begin(), old_info.end(),
438 std::insert_iterator<ModelSafeRoutingInfo>(temp_r, temp_r.begin()));
439 session_context_->set_previous_session_routing_info(temp_r);
440 }
441 } else {
442 session_context_->set_previous_session_routing_info(
443 old_job.session->routing_info());
444 }
378 } 445 }
379 446
380 void SyncerThread::FinishSyncSessionJob(const SyncSessionJob& job) { 447 void SyncerThread::FinishSyncSessionJob(const SyncSessionJob& job) {
381 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 448 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
382 // Update timing information for how often datatypes are triggering nudges. 449 // Update timing information for how often datatypes are triggering nudges.
383 base::TimeTicks now = TimeTicks::Now(); 450 base::TimeTicks now = TimeTicks::Now();
384 if (!last_sync_session_end_time_.is_null()) { 451 if (!last_sync_session_end_time_.is_null()) {
385 ModelTypePayloadMap::const_iterator iter; 452 ModelTypePayloadMap::const_iterator iter;
386 for (iter = job.session->source().types.begin(); 453 for (iter = job.session->source().types.begin();
387 iter != job.session->source().types.end(); 454 iter != job.session->source().types.end();
388 ++iter) { 455 ++iter) {
389 syncable::PostTimeToTypeHistogram(iter->first, 456 syncable::PostTimeToTypeHistogram(iter->first,
390 now - last_sync_session_end_time_); 457 now - last_sync_session_end_time_);
391 } 458 }
392 } 459 }
393 last_sync_session_end_time_ = now; 460 last_sync_session_end_time_ = now;
461 UpdateCarryoverSessionState(job);
394 if (IsSyncingCurrentlySilenced()) 462 if (IsSyncingCurrentlySilenced())
395 return; // Nothing to do. 463 return; // Nothing to do.
396 464
397 VLOG(1) << "Updating the next polling time after SyncMain"; 465 VLOG(1) << "Updating the next polling time after SyncMain";
398 ScheduleNextSync(job); 466 ScheduleNextSync(job);
399 } 467 }
400 468
401 void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) { 469 void SyncerThread::ScheduleNextSync(const SyncSessionJob& old_job) {
402 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 470 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
403 DCHECK(!old_job.session->HasMoreToSync()); 471 DCHECK(!old_job.session->HasMoreToSync());
(...skipping 95 matching lines...) Expand 10 before | Expand all | Expand 10 after
499 567
500 // Cap the backoff interval. 568 // Cap the backoff interval.
501 backoff_s = std::max(static_cast<int64>(1), 569 backoff_s = std::max(static_cast<int64>(1),
502 std::min(backoff_s, kMaxBackoffSeconds)); 570 std::min(backoff_s, kMaxBackoffSeconds));
503 571
504 return TimeDelta::FromSeconds(backoff_s); 572 return TimeDelta::FromSeconds(backoff_s);
505 } 573 }
506 574
507 void SyncerThread::Stop() { 575 void SyncerThread::Stop() {
508 syncer_->RequestEarlyExit(); // Safe to call from any thread. 576 syncer_->RequestEarlyExit(); // Safe to call from any thread.
577 session_context_->connection_manager()->RemoveListener(this);
509 thread_.Stop(); 578 thread_.Stop();
510 Notify(SyncEngineEvent::SYNCER_THREAD_EXITING);
511 } 579 }
512 580
513 void SyncerThread::DoCanaryJob() { 581 void SyncerThread::DoCanaryJob() {
514 DCHECK(pending_nudge_.get()); 582 DCHECK(pending_nudge_.get());
515 wait_interval_->had_nudge = false; 583 wait_interval_->had_nudge = false;
516 SyncSessionJob copy = {pending_nudge_->purpose, 584 SyncSessionJob copy = {pending_nudge_->purpose,
517 pending_nudge_->scheduled_start, 585 pending_nudge_->scheduled_start,
518 pending_nudge_->session}; 586 pending_nudge_->session};
519 DoSyncSessionJob(copy); 587 DoSyncSessionJob(copy);
520 } 588 }
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
570 DCHECK_EQ(MessageLoop::current(), thread_.message_loop()); 638 DCHECK_EQ(MessageLoop::current(), thread_.message_loop());
571 syncer_long_poll_interval_seconds_ = new_interval; 639 syncer_long_poll_interval_seconds_ = new_interval;
572 } 640 }
573 641
574 void SyncerThread::OnShouldStopSyncingPermanently() { 642 void SyncerThread::OnShouldStopSyncingPermanently() {
575 syncer_->RequestEarlyExit(); // Thread-safe. 643 syncer_->RequestEarlyExit(); // Thread-safe.
576 Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY); 644 Notify(SyncEngineEvent::STOP_SYNCING_PERMANENTLY);
577 } 645 }
578 646
579 void SyncerThread::OnServerConnectionEvent( 647 void SyncerThread::OnServerConnectionEvent(
580 const ServerConnectionEvent& event) { 648 const ServerConnectionEvent2& event) {
581 NOTIMPLEMENTED(); 649 thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this,
650 &SyncerThread::CheckServerConnectionManagerStatus,
651 event.connection_code));
582 } 652 }
583 653
584 void SyncerThread::set_notifications_enabled(bool notifications_enabled) { 654 void SyncerThread::set_notifications_enabled(bool notifications_enabled) {
585 session_context_->set_notifications_enabled(notifications_enabled); 655 session_context_->set_notifications_enabled(notifications_enabled);
586 } 656 }
587 657
588 } // s3 658 } // s3
589 } // browser_sync 659 } // browser_sync
OLDNEW
« no previous file with comments | « chrome/browser/sync/engine/syncer_thread2.h ('k') | chrome/browser/sync/engine/syncer_thread2_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698