Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(924)

Side by Side Diff: chrome/browser/sync/engine/syncer_thread.cc

Issue 275015: For sync exponential backoff, allow one nudge per exponential backoff interva... (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: '' Created 11 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 #include "chrome/browser/sync/engine/syncer_thread.h" 4 #include "chrome/browser/sync/engine/syncer_thread.h"
5 5
6 #include "build/build_config.h" 6 #include "build/build_config.h"
7 7
8 #ifdef OS_MACOSX 8 #ifdef OS_MACOSX
9 #include <CoreFoundation/CFNumber.h> 9 #include <CoreFoundation/CFNumber.h>
10 #include <IOKit/IOTypes.h> 10 #include <IOKit/IOTypes.h>
(...skipping 108 matching lines...) Expand 10 before | Expand all | Expand 10 after
119 return new SyncerThreadTimedStop(command_channel, mgr, connection_manager, 119 return new SyncerThreadTimedStop(command_channel, mgr, connection_manager,
120 all_status, model_safe_worker); 120 all_status, model_safe_worker);
121 } else { 121 } else {
122 // The default SyncerThread implementation, which does not time-out when 122 // The default SyncerThread implementation, which does not time-out when
123 // Stop is called. 123 // Stop is called.
124 return new SyncerThread(command_channel, mgr, connection_manager, 124 return new SyncerThread(command_channel, mgr, connection_manager,
125 all_status, model_safe_worker); 125 all_status, model_safe_worker);
126 } 126 }
127 } 127 }
128 128
129 bool SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) { 129 void SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) {
130 AutoLock lock(lock_); 130 AutoLock lock(lock_);
131 if (vault_.syncer_ == NULL) { 131 if (vault_.syncer_ == NULL) {
132 return false; 132 return;
133 } 133 }
134
135 if (vault_.current_wait_interval_.had_nudge_during_backoff) {
136 // Drop nudges on the floor if we've already had one since starting this
137 // stage of exponential backoff.
138 return;
139 }
140
134 NudgeSyncImpl(milliseconds_from_now, source); 141 NudgeSyncImpl(milliseconds_from_now, source);
135 return true;
136 } 142 }
137 143
138 SyncerThread::SyncerThread() 144 SyncerThread::SyncerThread()
139 : thread_main_started_(false, false), 145 : thread_main_started_(false, false),
140 thread_("SyncEngine_SyncerThread"), 146 thread_("SyncEngine_SyncerThread"),
141 vault_field_changed_(&lock_), 147 vault_field_changed_(&lock_),
142 p2p_authenticated_(false), 148 p2p_authenticated_(false),
143 p2p_subscribed_(false), 149 p2p_subscribed_(false),
144 client_command_hookup_(NULL), 150 client_command_hookup_(NULL),
145 conn_mgr_hookup_(NULL), 151 conn_mgr_hookup_(NULL),
(...skipping 150 matching lines...) Expand 10 before | Expand all | Expand 10 after
296 syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval(); 302 syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval();
297 } 303 }
298 } 304 }
299 305
300 void SyncerThread::ThreadMainLoop() { 306 void SyncerThread::ThreadMainLoop() {
301 // This is called with lock_ acquired. 307 // This is called with lock_ acquired.
302 lock_.AssertAcquired(); 308 lock_.AssertAcquired();
303 LOG(INFO) << "In thread main loop."; 309 LOG(INFO) << "In thread main loop.";
304 310
305 // Use the short poll value by default. 311 // Use the short poll value by default.
306 TimeDelta poll_seconds = 312 vault_.current_wait_interval_.poll_delta =
307 TimeDelta::FromSeconds(syncer_short_poll_interval_seconds_); 313 TimeDelta::FromSeconds(syncer_short_poll_interval_seconds_);
308 int user_idle_milliseconds = 0; 314 int user_idle_milliseconds = 0;
309 TimeTicks last_sync_time; 315 TimeTicks last_sync_time;
310 bool initial_sync_for_thread = true; 316 bool initial_sync_for_thread = true;
311 bool continue_sync_cycle = false; 317 bool continue_sync_cycle = false;
312 318
313 while (!vault_.stop_syncer_thread_) { 319 while (!vault_.stop_syncer_thread_) {
314 // The Wait()s in these conditionals using |vault_| are not TimedWait()s (as 320 // The Wait()s in these conditionals using |vault_| are not TimedWait()s (as
315 // below) because we cannot poll until these conditions are met, so we wait 321 // below) because we cannot poll until these conditions are met, so we wait
316 // indefinitely. 322 // indefinitely.
317 if (!vault_.connected_) { 323 if (!vault_.connected_) {
318 LOG(INFO) << "Syncer thread waiting for connection."; 324 LOG(INFO) << "Syncer thread waiting for connection.";
319 while (!vault_.connected_ && !vault_.stop_syncer_thread_) 325 while (!vault_.connected_ && !vault_.stop_syncer_thread_)
320 vault_field_changed_.Wait(); 326 vault_field_changed_.Wait();
321 LOG_IF(INFO, vault_.connected_) << "Syncer thread found connection."; 327 LOG_IF(INFO, vault_.connected_) << "Syncer thread found connection.";
322 continue; 328 continue;
323 } 329 }
324 330
325 if (vault_.syncer_ == NULL) { 331 if (vault_.syncer_ == NULL) {
326 LOG(INFO) << "Syncer thread waiting for database initialization."; 332 LOG(INFO) << "Syncer thread waiting for database initialization.";
327 while (vault_.syncer_ == NULL && !vault_.stop_syncer_thread_) 333 while (vault_.syncer_ == NULL && !vault_.stop_syncer_thread_)
328 vault_field_changed_.Wait(); 334 vault_field_changed_.Wait();
329 LOG_IF(INFO, !(vault_.syncer_ == NULL)) 335 LOG_IF(INFO, !(vault_.syncer_ == NULL))
330 << "Syncer was found after DB started."; 336 << "Syncer was found after DB started.";
331 continue; 337 continue;
332 } 338 }
333 339
334 const TimeTicks next_poll = last_sync_time + poll_seconds; 340 const TimeTicks next_poll = last_sync_time +
341 vault_.current_wait_interval_.poll_delta;
335 const TimeTicks end_wait = 342 const TimeTicks end_wait =
336 !vault_.nudge_queue_.empty() && 343 !vault_.nudge_queue_.empty() &&
337 vault_.nudge_queue_.top().first < next_poll ? 344 vault_.nudge_queue_.top().first < next_poll ?
338 vault_.nudge_queue_.top().first : next_poll; 345 vault_.nudge_queue_.top().first : next_poll;
339 LOG(INFO) << "end_wait is " << end_wait.ToInternalValue(); 346 LOG(INFO) << "end_wait is " << end_wait.ToInternalValue();
340 LOG(INFO) << "next_poll is " << next_poll.ToInternalValue(); 347 LOG(INFO) << "next_poll is " << next_poll.ToInternalValue();
341 348
342 // We block until the CV is signaled (e.g a control field changed, loss of 349 // We block until the CV is signaled (e.g a control field changed, loss of
343 // network connection, nudge, spurious, etc), or the poll interval elapses. 350 // network connection, nudge, spurious, etc), or the poll interval elapses.
344 TimeDelta sleep_time = end_wait - TimeTicks::Now(); 351 TimeDelta sleep_time = end_wait - TimeTicks::Now();
345 if (sleep_time > TimeDelta::FromSeconds(0)) { 352 if (sleep_time > TimeDelta::FromSeconds(0)) {
346 vault_field_changed_.TimedWait(sleep_time); 353 vault_field_changed_.TimedWait(sleep_time);
347 354
348 if (TimeTicks::Now() < end_wait) { 355 if (TimeTicks::Now() < end_wait) {
349 // Didn't timeout. Could be a spurious signal, or a signal corresponding 356 // Didn't timeout. Could be a spurious signal, or a signal corresponding
350 // to an actual change in one of our control fields. By continuing here 357 // to an actual change in one of our control fields. By continuing here
351 // we perform the typical "always recheck conditions when signaled", 358 // we perform the typical "always recheck conditions when signaled",
352 // (typically handled by a while(condition_not_met) cv.wait() construct) 359 // (typically handled by a while(condition_not_met) cv.wait() construct)
353 // because we jump to the top of the loop. The main difference is we 360 // because we jump to the top of the loop. The main difference is we
354 // recalculate the wait interval, but last_sync_time won't have changed. 361 // recalculate the wait interval, but last_sync_time won't have changed.
355 // So if we were signaled by a nudge (for ex.) we'll grab the new nudge 362 // So if we were signaled by a nudge (for ex.) we'll grab the new nudge
356 // off the queue and wait for that delta. If it was a spurious signal, 363 // off the queue and wait for that delta. If it was a spurious signal,
357 // we'll keep waiting for the same moment in time as we just were. 364 // we'll keep waiting for the same moment in time as we just were.
358 continue; 365 continue;
359 } 366 }
360 } 367 }
361 368
362 // Handle a nudge, caused by either a notification or a local bookmark 369 // Handle a nudge, caused by either a notification or a local bookmark
363 // event. This will also update the source of the following SyncMain call. 370 // event. This will also update the source of the following SyncMain call.
364 UpdateNudgeSource(&continue_sync_cycle, &initial_sync_for_thread); 371 bool nudged = UpdateNudgeSource(continue_sync_cycle,
372 &initial_sync_for_thread);
365 373
366 LOG(INFO) << "Calling Sync Main at time " << Time::Now().ToInternalValue(); 374 LOG(INFO) << "Calling Sync Main at time " << Time::Now().ToInternalValue();
367 SyncMain(vault_.syncer_); 375 SyncMain(vault_.syncer_);
368 last_sync_time = TimeTicks::Now(); 376 last_sync_time = TimeTicks::Now();
369 377
370 LOG(INFO) << "Updating the next polling time after SyncMain"; 378 LOG(INFO) << "Updating the next polling time after SyncMain";
371 poll_seconds = TimeDelta::FromSeconds(CalculatePollingWaitTime( 379 vault_.current_wait_interval_ = CalculatePollingWaitTime(
372 allstatus_->status(), static_cast<int>(poll_seconds.InSeconds()), 380 allstatus_->status(),
373 &user_idle_milliseconds, &continue_sync_cycle)); 381 static_cast<int>(vault_.current_wait_interval_.poll_delta.InSeconds()),
382 &user_idle_milliseconds, &continue_sync_cycle, nudged);
374 } 383 }
375
376 } 384 }
377 385
378 // We check how long the user's been idle and sync less often if the machine is 386 // We check how long the user's been idle and sync less often if the machine is
379 // not in use. The aim is to reduce server load. 387 // not in use. The aim is to reduce server load.
380 // TODO(timsteele): Should use Time(Delta). 388 // TODO(timsteele): Should use Time(Delta).
381 int SyncerThread::CalculatePollingWaitTime( 389 SyncerThread::WaitInterval SyncerThread::CalculatePollingWaitTime(
382 const AllStatus::Status& status, 390 const AllStatus::Status& status,
383 int last_poll_wait, // Time in seconds. 391 int last_poll_wait, // Time in seconds.
384 int* user_idle_milliseconds, 392 int* user_idle_milliseconds,
385 bool* continue_sync_cycle) { 393 bool* continue_sync_cycle,
394 bool was_nudged) {
395 lock_.AssertAcquired(); // We access 'vault' in here, so we need the lock.
396 WaitInterval return_interval;
386 bool is_continuing_sync_cyle = *continue_sync_cycle; 397 bool is_continuing_sync_cyle = *continue_sync_cycle;
387 *continue_sync_cycle = false; 398 *continue_sync_cycle = false;
388 399
389 // Determine if the syncer has unfinished work to do from allstatus_. 400 // Determine if the syncer has unfinished work to do from allstatus_.
390 const bool syncer_has_work_to_do = 401 const bool syncer_has_work_to_do =
391 status.updates_available > status.updates_received 402 status.updates_available > status.updates_received
392 || status.unsynced_count > 0; 403 || status.unsynced_count > 0;
393 LOG(INFO) << "syncer_has_work_to_do is " << syncer_has_work_to_do; 404 LOG(INFO) << "syncer_has_work_to_do is " << syncer_has_work_to_do;
394 405
395 // First calculate the expected wait time, figuring in any backoff because of 406 // First calculate the expected wait time, figuring in any backoff because of
396 // user idle time. next_wait is in seconds 407 // user idle time. next_wait is in seconds
397 syncer_polling_interval_ = (!status.notifications_enabled) ? 408 syncer_polling_interval_ = (!status.notifications_enabled) ?
398 syncer_short_poll_interval_seconds_ : 409 syncer_short_poll_interval_seconds_ :
399 syncer_long_poll_interval_seconds_; 410 syncer_long_poll_interval_seconds_;
400 int default_next_wait = syncer_polling_interval_; 411 int default_next_wait = syncer_polling_interval_;
401 int actual_next_wait = default_next_wait; 412 return_interval.poll_delta = TimeDelta::FromSeconds(default_next_wait);
402 413
403 if (syncer_has_work_to_do) { 414 if (syncer_has_work_to_do) {
404 // Provide exponential backoff due to consecutive errors, else attempt to 415 // Provide exponential backoff due to consecutive errors, else attempt to
405 // complete the work as soon as possible. 416 // complete the work as soon as possible.
406 if (!is_continuing_sync_cyle) { 417 if (is_continuing_sync_cyle) {
407 actual_next_wait = AllStatus::GetRecommendedDelaySeconds(0); 418 return_interval.mode = WaitInterval::EXPONENTIAL_BACKOFF;
419 if (was_nudged && vault_.current_wait_interval_.mode ==
420 WaitInterval::EXPONENTIAL_BACKOFF) {
421 // We were nudged, it failed, and we were already in backoff.
422 return_interval.had_nudge_during_backoff = true;
423 // Keep exponent for exponential backoff the same in this case.
424 return_interval.poll_delta = vault_.current_wait_interval_.poll_delta;
425 } else {
426 // We weren't nudged, or we were in a NORMAL wait interval until now.
427 return_interval.poll_delta = TimeDelta::FromSeconds(
428 AllStatus::GetRecommendedDelaySeconds(last_poll_wait));
429 }
408 } else { 430 } else {
409 actual_next_wait = AllStatus::GetRecommendedDelaySeconds(last_poll_wait); 431 // No consecutive error.
432 return_interval.poll_delta = TimeDelta::FromSeconds(
433 AllStatus::GetRecommendedDelaySeconds(0));
410 } 434 }
411 *continue_sync_cycle = true; 435 *continue_sync_cycle = true;
412 } else if (!status.notifications_enabled) { 436 } else if (!status.notifications_enabled) {
413 // Ensure that we start exponential backoff from our base polling 437 // Ensure that we start exponential backoff from our base polling
414 // interval when we are not continuing a sync cycle. 438 // interval when we are not continuing a sync cycle.
415 last_poll_wait = std::max(last_poll_wait, syncer_polling_interval_); 439 last_poll_wait = std::max(last_poll_wait, syncer_polling_interval_);
416 440
417 // Did the user start interacting with the computer again? 441 // Did the user start interacting with the computer again?
418 // If so, revise our idle time (and probably next_sync_time) downwards 442 // If so, revise our idle time (and probably next_sync_time) downwards
419 int new_idle_time = disable_idle_detection_ ? 0 : UserIdleTime(); 443 int new_idle_time = disable_idle_detection_ ? 0 : UserIdleTime();
420 if (new_idle_time < *user_idle_milliseconds) { 444 if (new_idle_time < *user_idle_milliseconds) {
421 *user_idle_milliseconds = new_idle_time; 445 *user_idle_milliseconds = new_idle_time;
422 } 446 }
423 actual_next_wait = CalculateSyncWaitTime(last_poll_wait * 1000, 447 return_interval.poll_delta = TimeDelta::FromMilliseconds(
424 *user_idle_milliseconds) / 1000; 448 CalculateSyncWaitTime(last_poll_wait * 1000,
425 DCHECK_GE(actual_next_wait, default_next_wait); 449 *user_idle_milliseconds));
450 DCHECK_GE(return_interval.poll_delta.InSeconds(), default_next_wait);
426 } 451 }
427 452
428 LOG(INFO) << "Sync wait: idle " << default_next_wait 453 LOG(INFO) << "Sync wait: idle " << default_next_wait
429 << " non-idle or backoff " << actual_next_wait << "."; 454 << " non-idle or backoff "
455 << return_interval.poll_delta.InSeconds() << ".";
430 456
431 return actual_next_wait; 457 return return_interval;
432 } 458 }
433 459
434 void SyncerThread::ThreadMain() { 460 void SyncerThread::ThreadMain() {
435 AutoLock lock(lock_); 461 AutoLock lock(lock_);
436 // Signal Start() to let it know we've made it safely onto the message loop, 462 // Signal Start() to let it know we've made it safely onto the message loop,
437 // and unblock it's caller. 463 // and unblock it's caller.
438 thread_main_started_.Signal(); 464 thread_main_started_.Signal();
439 ThreadMainLoop(); 465 ThreadMainLoop();
440 LOG(INFO) << "Syncer thread ThreadMain is done."; 466 LOG(INFO) << "Syncer thread ThreadMain is done.";
441 } 467 }
442 468
443 void SyncerThread::SyncMain(Syncer* syncer) { 469 void SyncerThread::SyncMain(Syncer* syncer) {
444 CHECK(syncer); 470 CHECK(syncer);
445 AutoUnlock unlock(lock_); 471 AutoUnlock unlock(lock_);
446 while (syncer->SyncShare()) { 472 while (syncer->SyncShare()) {
447 LOG(INFO) << "Looping in sync share"; 473 LOG(INFO) << "Looping in sync share";
448 } 474 }
449 LOG(INFO) << "Done looping in sync share"; 475 LOG(INFO) << "Done looping in sync share";
450 } 476 }
451 477
452 void SyncerThread::UpdateNudgeSource(bool* continue_sync_cycle, 478 bool SyncerThread::UpdateNudgeSource(bool continue_sync_cycle,
453 bool* initial_sync) { 479 bool* initial_sync) {
454 bool nudged = false; 480 bool nudged = false;
455 NudgeSource nudge_source = kUnknown; 481 NudgeSource nudge_source = kUnknown;
456 // Has the previous sync cycle completed? 482 // Has the previous sync cycle completed?
457 if (continue_sync_cycle) { 483 if (continue_sync_cycle) {
458 nudge_source = kContinuation; 484 nudge_source = kContinuation;
459 } 485 }
460 // Update the nudge source if a new nudge has come through during the 486 // Update the nudge source if a new nudge has come through during the
461 // previous sync cycle. 487 // previous sync cycle.
462 while (!vault_.nudge_queue_.empty() && 488 while (!vault_.nudge_queue_.empty() &&
463 TimeTicks::Now() >= vault_.nudge_queue_.top().first) { 489 TimeTicks::Now() >= vault_.nudge_queue_.top().first) {
464 if (!nudged) { 490 if (!nudged) {
465 nudge_source = vault_.nudge_queue_.top().second; 491 nudge_source = vault_.nudge_queue_.top().second;
466 *continue_sync_cycle = false; // Reset the continuation token on nudge.
467 nudged = true; 492 nudged = true;
468 } 493 }
469 vault_.nudge_queue_.pop(); 494 vault_.nudge_queue_.pop();
470 } 495 }
471 SetUpdatesSource(nudged, nudge_source, initial_sync); 496 SetUpdatesSource(nudged, nudge_source, initial_sync);
497 return nudged;
472 } 498 }
473 499
474 void SyncerThread::SetUpdatesSource(bool nudged, NudgeSource nudge_source, 500 void SyncerThread::SetUpdatesSource(bool nudged, NudgeSource nudge_source,
475 bool* initial_sync) { 501 bool* initial_sync) {
476 sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source = 502 sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source =
477 sync_pb::GetUpdatesCallerInfo::UNKNOWN; 503 sync_pb::GetUpdatesCallerInfo::UNKNOWN;
478 if (*initial_sync) { 504 if (*initial_sync) {
479 updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE; 505 updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE;
480 *initial_sync = false; 506 *initial_sync = false;
481 } else if (!nudged) { 507 } else if (!nudged) {
(...skipping 154 matching lines...) Expand 10 before | Expand all | Expand 10 after
636 break; 662 break;
637 } 663 }
638 664
639 if (NULL != vault_.syncer_) { 665 if (NULL != vault_.syncer_) {
640 vault_.syncer_->set_notifications_enabled( 666 vault_.syncer_->set_notifications_enabled(
641 p2p_authenticated_ && p2p_subscribed_); 667 p2p_authenticated_ && p2p_subscribed_);
642 } 668 }
643 } 669 }
644 670
645 } // namespace browser_sync 671 } // namespace browser_sync
OLDNEW
« no previous file with comments | « chrome/browser/sync/engine/syncer_thread.h ('k') | chrome/browser/sync/engine/syncer_thread_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698