| OLD | NEW |
| 1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 #include "chrome/browser/sync/engine/syncer_thread.h" | 4 #include "chrome/browser/sync/engine/syncer_thread.h" |
| 5 | 5 |
| 6 #include "build/build_config.h" | 6 #include "build/build_config.h" |
| 7 | 7 |
| 8 #ifdef OS_MACOSX | 8 #ifdef OS_MACOSX |
| 9 #include <CoreFoundation/CFNumber.h> | 9 #include <CoreFoundation/CFNumber.h> |
| 10 #include <IOKit/IOTypes.h> | 10 #include <IOKit/IOTypes.h> |
| (...skipping 108 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 119 return new SyncerThreadTimedStop(command_channel, mgr, connection_manager, | 119 return new SyncerThreadTimedStop(command_channel, mgr, connection_manager, |
| 120 all_status, model_safe_worker); | 120 all_status, model_safe_worker); |
| 121 } else { | 121 } else { |
| 122 // The default SyncerThread implementation, which does not time-out when | 122 // The default SyncerThread implementation, which does not time-out when |
| 123 // Stop is called. | 123 // Stop is called. |
| 124 return new SyncerThread(command_channel, mgr, connection_manager, | 124 return new SyncerThread(command_channel, mgr, connection_manager, |
| 125 all_status, model_safe_worker); | 125 all_status, model_safe_worker); |
| 126 } | 126 } |
| 127 } | 127 } |
| 128 | 128 |
| 129 bool SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) { | 129 void SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) { |
| 130 AutoLock lock(lock_); | 130 AutoLock lock(lock_); |
| 131 if (vault_.syncer_ == NULL) { | 131 if (vault_.syncer_ == NULL) { |
| 132 return false; | 132 return; |
| 133 } | 133 } |
| 134 |
| 135 if (vault_.current_wait_interval_.had_nudge_during_backoff) { |
| 136 // Drop nudges on the floor if we've already had one since starting this |
| 137 // stage of exponential backoff. |
| 138 return; |
| 139 } |
| 140 |
| 134 NudgeSyncImpl(milliseconds_from_now, source); | 141 NudgeSyncImpl(milliseconds_from_now, source); |
| 135 return true; | |
| 136 } | 142 } |
| 137 | 143 |
| 138 SyncerThread::SyncerThread() | 144 SyncerThread::SyncerThread() |
| 139 : thread_main_started_(false, false), | 145 : thread_main_started_(false, false), |
| 140 thread_("SyncEngine_SyncerThread"), | 146 thread_("SyncEngine_SyncerThread"), |
| 141 vault_field_changed_(&lock_), | 147 vault_field_changed_(&lock_), |
| 142 p2p_authenticated_(false), | 148 p2p_authenticated_(false), |
| 143 p2p_subscribed_(false), | 149 p2p_subscribed_(false), |
| 144 client_command_hookup_(NULL), | 150 client_command_hookup_(NULL), |
| 145 conn_mgr_hookup_(NULL), | 151 conn_mgr_hookup_(NULL), |
| (...skipping 150 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 296 syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval(); | 302 syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval(); |
| 297 } | 303 } |
| 298 } | 304 } |
| 299 | 305 |
| 300 void SyncerThread::ThreadMainLoop() { | 306 void SyncerThread::ThreadMainLoop() { |
| 301 // This is called with lock_ acquired. | 307 // This is called with lock_ acquired. |
| 302 lock_.AssertAcquired(); | 308 lock_.AssertAcquired(); |
| 303 LOG(INFO) << "In thread main loop."; | 309 LOG(INFO) << "In thread main loop."; |
| 304 | 310 |
| 305 // Use the short poll value by default. | 311 // Use the short poll value by default. |
| 306 TimeDelta poll_seconds = | 312 vault_.current_wait_interval_.poll_delta = |
| 307 TimeDelta::FromSeconds(syncer_short_poll_interval_seconds_); | 313 TimeDelta::FromSeconds(syncer_short_poll_interval_seconds_); |
| 308 int user_idle_milliseconds = 0; | 314 int user_idle_milliseconds = 0; |
| 309 TimeTicks last_sync_time; | 315 TimeTicks last_sync_time; |
| 310 bool initial_sync_for_thread = true; | 316 bool initial_sync_for_thread = true; |
| 311 bool continue_sync_cycle = false; | 317 bool continue_sync_cycle = false; |
| 312 | 318 |
| 313 while (!vault_.stop_syncer_thread_) { | 319 while (!vault_.stop_syncer_thread_) { |
| 314 // The Wait()s in these conditionals using |vault_| are not TimedWait()s (as | 320 // The Wait()s in these conditionals using |vault_| are not TimedWait()s (as |
| 315 // below) because we cannot poll until these conditions are met, so we wait | 321 // below) because we cannot poll until these conditions are met, so we wait |
| 316 // indefinitely. | 322 // indefinitely. |
| 317 if (!vault_.connected_) { | 323 if (!vault_.connected_) { |
| 318 LOG(INFO) << "Syncer thread waiting for connection."; | 324 LOG(INFO) << "Syncer thread waiting for connection."; |
| 319 while (!vault_.connected_ && !vault_.stop_syncer_thread_) | 325 while (!vault_.connected_ && !vault_.stop_syncer_thread_) |
| 320 vault_field_changed_.Wait(); | 326 vault_field_changed_.Wait(); |
| 321 LOG_IF(INFO, vault_.connected_) << "Syncer thread found connection."; | 327 LOG_IF(INFO, vault_.connected_) << "Syncer thread found connection."; |
| 322 continue; | 328 continue; |
| 323 } | 329 } |
| 324 | 330 |
| 325 if (vault_.syncer_ == NULL) { | 331 if (vault_.syncer_ == NULL) { |
| 326 LOG(INFO) << "Syncer thread waiting for database initialization."; | 332 LOG(INFO) << "Syncer thread waiting for database initialization."; |
| 327 while (vault_.syncer_ == NULL && !vault_.stop_syncer_thread_) | 333 while (vault_.syncer_ == NULL && !vault_.stop_syncer_thread_) |
| 328 vault_field_changed_.Wait(); | 334 vault_field_changed_.Wait(); |
| 329 LOG_IF(INFO, !(vault_.syncer_ == NULL)) | 335 LOG_IF(INFO, !(vault_.syncer_ == NULL)) |
| 330 << "Syncer was found after DB started."; | 336 << "Syncer was found after DB started."; |
| 331 continue; | 337 continue; |
| 332 } | 338 } |
| 333 | 339 |
| 334 const TimeTicks next_poll = last_sync_time + poll_seconds; | 340 const TimeTicks next_poll = last_sync_time + |
| 341 vault_.current_wait_interval_.poll_delta; |
| 335 const TimeTicks end_wait = | 342 const TimeTicks end_wait = |
| 336 !vault_.nudge_queue_.empty() && | 343 !vault_.nudge_queue_.empty() && |
| 337 vault_.nudge_queue_.top().first < next_poll ? | 344 vault_.nudge_queue_.top().first < next_poll ? |
| 338 vault_.nudge_queue_.top().first : next_poll; | 345 vault_.nudge_queue_.top().first : next_poll; |
| 339 LOG(INFO) << "end_wait is " << end_wait.ToInternalValue(); | 346 LOG(INFO) << "end_wait is " << end_wait.ToInternalValue(); |
| 340 LOG(INFO) << "next_poll is " << next_poll.ToInternalValue(); | 347 LOG(INFO) << "next_poll is " << next_poll.ToInternalValue(); |
| 341 | 348 |
| 342 // We block until the CV is signaled (e.g a control field changed, loss of | 349 // We block until the CV is signaled (e.g a control field changed, loss of |
| 343 // network connection, nudge, spurious, etc), or the poll interval elapses. | 350 // network connection, nudge, spurious, etc), or the poll interval elapses. |
| 344 TimeDelta sleep_time = end_wait - TimeTicks::Now(); | 351 TimeDelta sleep_time = end_wait - TimeTicks::Now(); |
| 345 if (sleep_time > TimeDelta::FromSeconds(0)) { | 352 if (sleep_time > TimeDelta::FromSeconds(0)) { |
| 346 vault_field_changed_.TimedWait(sleep_time); | 353 vault_field_changed_.TimedWait(sleep_time); |
| 347 | 354 |
| 348 if (TimeTicks::Now() < end_wait) { | 355 if (TimeTicks::Now() < end_wait) { |
| 349 // Didn't timeout. Could be a spurious signal, or a signal corresponding | 356 // Didn't timeout. Could be a spurious signal, or a signal corresponding |
| 350 // to an actual change in one of our control fields. By continuing here | 357 // to an actual change in one of our control fields. By continuing here |
| 351 // we perform the typical "always recheck conditions when signaled", | 358 // we perform the typical "always recheck conditions when signaled", |
| 352 // (typically handled by a while(condition_not_met) cv.wait() construct) | 359 // (typically handled by a while(condition_not_met) cv.wait() construct) |
| 353 // because we jump to the top of the loop. The main difference is we | 360 // because we jump to the top of the loop. The main difference is we |
| 354 // recalculate the wait interval, but last_sync_time won't have changed. | 361 // recalculate the wait interval, but last_sync_time won't have changed. |
| 355 // So if we were signaled by a nudge (for ex.) we'll grab the new nudge | 362 // So if we were signaled by a nudge (for ex.) we'll grab the new nudge |
| 356 // off the queue and wait for that delta. If it was a spurious signal, | 363 // off the queue and wait for that delta. If it was a spurious signal, |
| 357 // we'll keep waiting for the same moment in time as we just were. | 364 // we'll keep waiting for the same moment in time as we just were. |
| 358 continue; | 365 continue; |
| 359 } | 366 } |
| 360 } | 367 } |
| 361 | 368 |
| 362 // Handle a nudge, caused by either a notification or a local bookmark | 369 // Handle a nudge, caused by either a notification or a local bookmark |
| 363 // event. This will also update the source of the following SyncMain call. | 370 // event. This will also update the source of the following SyncMain call. |
| 364 UpdateNudgeSource(&continue_sync_cycle, &initial_sync_for_thread); | 371 bool nudged = UpdateNudgeSource(continue_sync_cycle, |
| 372 &initial_sync_for_thread); |
| 365 | 373 |
| 366 LOG(INFO) << "Calling Sync Main at time " << Time::Now().ToInternalValue(); | 374 LOG(INFO) << "Calling Sync Main at time " << Time::Now().ToInternalValue(); |
| 367 SyncMain(vault_.syncer_); | 375 SyncMain(vault_.syncer_); |
| 368 last_sync_time = TimeTicks::Now(); | 376 last_sync_time = TimeTicks::Now(); |
| 369 | 377 |
| 370 LOG(INFO) << "Updating the next polling time after SyncMain"; | 378 LOG(INFO) << "Updating the next polling time after SyncMain"; |
| 371 poll_seconds = TimeDelta::FromSeconds(CalculatePollingWaitTime( | 379 vault_.current_wait_interval_ = CalculatePollingWaitTime( |
| 372 allstatus_->status(), static_cast<int>(poll_seconds.InSeconds()), | 380 allstatus_->status(), |
| 373 &user_idle_milliseconds, &continue_sync_cycle)); | 381 static_cast<int>(vault_.current_wait_interval_.poll_delta.InSeconds()), |
| 382 &user_idle_milliseconds, &continue_sync_cycle, nudged); |
| 374 } | 383 } |
| 375 | |
| 376 } | 384 } |
| 377 | 385 |
| 378 // We check how long the user's been idle and sync less often if the machine is | 386 // We check how long the user's been idle and sync less often if the machine is |
| 379 // not in use. The aim is to reduce server load. | 387 // not in use. The aim is to reduce server load. |
| 380 // TODO(timsteele): Should use Time(Delta). | 388 // TODO(timsteele): Should use Time(Delta). |
| 381 int SyncerThread::CalculatePollingWaitTime( | 389 SyncerThread::WaitInterval SyncerThread::CalculatePollingWaitTime( |
| 382 const AllStatus::Status& status, | 390 const AllStatus::Status& status, |
| 383 int last_poll_wait, // Time in seconds. | 391 int last_poll_wait, // Time in seconds. |
| 384 int* user_idle_milliseconds, | 392 int* user_idle_milliseconds, |
| 385 bool* continue_sync_cycle) { | 393 bool* continue_sync_cycle, |
| 394 bool was_nudged) { |
| 395 lock_.AssertAcquired(); // We access 'vault' in here, so we need the lock. |
| 396 WaitInterval return_interval; |
| 386 bool is_continuing_sync_cyle = *continue_sync_cycle; | 397 bool is_continuing_sync_cyle = *continue_sync_cycle; |
| 387 *continue_sync_cycle = false; | 398 *continue_sync_cycle = false; |
| 388 | 399 |
| 389 // Determine if the syncer has unfinished work to do from allstatus_. | 400 // Determine if the syncer has unfinished work to do from allstatus_. |
| 390 const bool syncer_has_work_to_do = | 401 const bool syncer_has_work_to_do = |
| 391 status.updates_available > status.updates_received | 402 status.updates_available > status.updates_received |
| 392 || status.unsynced_count > 0; | 403 || status.unsynced_count > 0; |
| 393 LOG(INFO) << "syncer_has_work_to_do is " << syncer_has_work_to_do; | 404 LOG(INFO) << "syncer_has_work_to_do is " << syncer_has_work_to_do; |
| 394 | 405 |
| 395 // First calculate the expected wait time, figuring in any backoff because of | 406 // First calculate the expected wait time, figuring in any backoff because of |
| 396 // user idle time. next_wait is in seconds | 407 // user idle time. next_wait is in seconds |
| 397 syncer_polling_interval_ = (!status.notifications_enabled) ? | 408 syncer_polling_interval_ = (!status.notifications_enabled) ? |
| 398 syncer_short_poll_interval_seconds_ : | 409 syncer_short_poll_interval_seconds_ : |
| 399 syncer_long_poll_interval_seconds_; | 410 syncer_long_poll_interval_seconds_; |
| 400 int default_next_wait = syncer_polling_interval_; | 411 int default_next_wait = syncer_polling_interval_; |
| 401 int actual_next_wait = default_next_wait; | 412 return_interval.poll_delta = TimeDelta::FromSeconds(default_next_wait); |
| 402 | 413 |
| 403 if (syncer_has_work_to_do) { | 414 if (syncer_has_work_to_do) { |
| 404 // Provide exponential backoff due to consecutive errors, else attempt to | 415 // Provide exponential backoff due to consecutive errors, else attempt to |
| 405 // complete the work as soon as possible. | 416 // complete the work as soon as possible. |
| 406 if (!is_continuing_sync_cyle) { | 417 if (is_continuing_sync_cyle) { |
| 407 actual_next_wait = AllStatus::GetRecommendedDelaySeconds(0); | 418 return_interval.mode = WaitInterval::EXPONENTIAL_BACKOFF; |
| 419 if (was_nudged && vault_.current_wait_interval_.mode == |
| 420 WaitInterval::EXPONENTIAL_BACKOFF) { |
| 421 // We were nudged, it failed, and we were already in backoff. |
| 422 return_interval.had_nudge_during_backoff = true; |
| 423 // Keep exponent for exponential backoff the same in this case. |
| 424 return_interval.poll_delta = vault_.current_wait_interval_.poll_delta; |
| 425 } else { |
| 426 // We weren't nudged, or we were in a NORMAL wait interval until now. |
| 427 return_interval.poll_delta = TimeDelta::FromSeconds( |
| 428 AllStatus::GetRecommendedDelaySeconds(last_poll_wait)); |
| 429 } |
| 408 } else { | 430 } else { |
| 409 actual_next_wait = AllStatus::GetRecommendedDelaySeconds(last_poll_wait); | 431 // No consecutive error. |
| 432 return_interval.poll_delta = TimeDelta::FromSeconds( |
| 433 AllStatus::GetRecommendedDelaySeconds(0)); |
| 410 } | 434 } |
| 411 *continue_sync_cycle = true; | 435 *continue_sync_cycle = true; |
| 412 } else if (!status.notifications_enabled) { | 436 } else if (!status.notifications_enabled) { |
| 413 // Ensure that we start exponential backoff from our base polling | 437 // Ensure that we start exponential backoff from our base polling |
| 414 // interval when we are not continuing a sync cycle. | 438 // interval when we are not continuing a sync cycle. |
| 415 last_poll_wait = std::max(last_poll_wait, syncer_polling_interval_); | 439 last_poll_wait = std::max(last_poll_wait, syncer_polling_interval_); |
| 416 | 440 |
| 417 // Did the user start interacting with the computer again? | 441 // Did the user start interacting with the computer again? |
| 418 // If so, revise our idle time (and probably next_sync_time) downwards | 442 // If so, revise our idle time (and probably next_sync_time) downwards |
| 419 int new_idle_time = disable_idle_detection_ ? 0 : UserIdleTime(); | 443 int new_idle_time = disable_idle_detection_ ? 0 : UserIdleTime(); |
| 420 if (new_idle_time < *user_idle_milliseconds) { | 444 if (new_idle_time < *user_idle_milliseconds) { |
| 421 *user_idle_milliseconds = new_idle_time; | 445 *user_idle_milliseconds = new_idle_time; |
| 422 } | 446 } |
| 423 actual_next_wait = CalculateSyncWaitTime(last_poll_wait * 1000, | 447 return_interval.poll_delta = TimeDelta::FromMilliseconds( |
| 424 *user_idle_milliseconds) / 1000; | 448 CalculateSyncWaitTime(last_poll_wait * 1000, |
| 425 DCHECK_GE(actual_next_wait, default_next_wait); | 449 *user_idle_milliseconds)); |
| 450 DCHECK_GE(return_interval.poll_delta.InSeconds(), default_next_wait); |
| 426 } | 451 } |
| 427 | 452 |
| 428 LOG(INFO) << "Sync wait: idle " << default_next_wait | 453 LOG(INFO) << "Sync wait: idle " << default_next_wait |
| 429 << " non-idle or backoff " << actual_next_wait << "."; | 454 << " non-idle or backoff " |
| 455 << return_interval.poll_delta.InSeconds() << "."; |
| 430 | 456 |
| 431 return actual_next_wait; | 457 return return_interval; |
| 432 } | 458 } |
| 433 | 459 |
| 434 void SyncerThread::ThreadMain() { | 460 void SyncerThread::ThreadMain() { |
| 435 AutoLock lock(lock_); | 461 AutoLock lock(lock_); |
| 436 // Signal Start() to let it know we've made it safely onto the message loop, | 462 // Signal Start() to let it know we've made it safely onto the message loop, |
| 437 // and unblock it's caller. | 463 // and unblock it's caller. |
| 438 thread_main_started_.Signal(); | 464 thread_main_started_.Signal(); |
| 439 ThreadMainLoop(); | 465 ThreadMainLoop(); |
| 440 LOG(INFO) << "Syncer thread ThreadMain is done."; | 466 LOG(INFO) << "Syncer thread ThreadMain is done."; |
| 441 } | 467 } |
| 442 | 468 |
| 443 void SyncerThread::SyncMain(Syncer* syncer) { | 469 void SyncerThread::SyncMain(Syncer* syncer) { |
| 444 CHECK(syncer); | 470 CHECK(syncer); |
| 445 AutoUnlock unlock(lock_); | 471 AutoUnlock unlock(lock_); |
| 446 while (syncer->SyncShare()) { | 472 while (syncer->SyncShare()) { |
| 447 LOG(INFO) << "Looping in sync share"; | 473 LOG(INFO) << "Looping in sync share"; |
| 448 } | 474 } |
| 449 LOG(INFO) << "Done looping in sync share"; | 475 LOG(INFO) << "Done looping in sync share"; |
| 450 } | 476 } |
| 451 | 477 |
| 452 void SyncerThread::UpdateNudgeSource(bool* continue_sync_cycle, | 478 bool SyncerThread::UpdateNudgeSource(bool continue_sync_cycle, |
| 453 bool* initial_sync) { | 479 bool* initial_sync) { |
| 454 bool nudged = false; | 480 bool nudged = false; |
| 455 NudgeSource nudge_source = kUnknown; | 481 NudgeSource nudge_source = kUnknown; |
| 456 // Has the previous sync cycle completed? | 482 // Has the previous sync cycle completed? |
| 457 if (continue_sync_cycle) { | 483 if (continue_sync_cycle) { |
| 458 nudge_source = kContinuation; | 484 nudge_source = kContinuation; |
| 459 } | 485 } |
| 460 // Update the nudge source if a new nudge has come through during the | 486 // Update the nudge source if a new nudge has come through during the |
| 461 // previous sync cycle. | 487 // previous sync cycle. |
| 462 while (!vault_.nudge_queue_.empty() && | 488 while (!vault_.nudge_queue_.empty() && |
| 463 TimeTicks::Now() >= vault_.nudge_queue_.top().first) { | 489 TimeTicks::Now() >= vault_.nudge_queue_.top().first) { |
| 464 if (!nudged) { | 490 if (!nudged) { |
| 465 nudge_source = vault_.nudge_queue_.top().second; | 491 nudge_source = vault_.nudge_queue_.top().second; |
| 466 *continue_sync_cycle = false; // Reset the continuation token on nudge. | |
| 467 nudged = true; | 492 nudged = true; |
| 468 } | 493 } |
| 469 vault_.nudge_queue_.pop(); | 494 vault_.nudge_queue_.pop(); |
| 470 } | 495 } |
| 471 SetUpdatesSource(nudged, nudge_source, initial_sync); | 496 SetUpdatesSource(nudged, nudge_source, initial_sync); |
| 497 return nudged; |
| 472 } | 498 } |
| 473 | 499 |
| 474 void SyncerThread::SetUpdatesSource(bool nudged, NudgeSource nudge_source, | 500 void SyncerThread::SetUpdatesSource(bool nudged, NudgeSource nudge_source, |
| 475 bool* initial_sync) { | 501 bool* initial_sync) { |
| 476 sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source = | 502 sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source = |
| 477 sync_pb::GetUpdatesCallerInfo::UNKNOWN; | 503 sync_pb::GetUpdatesCallerInfo::UNKNOWN; |
| 478 if (*initial_sync) { | 504 if (*initial_sync) { |
| 479 updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE; | 505 updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE; |
| 480 *initial_sync = false; | 506 *initial_sync = false; |
| 481 } else if (!nudged) { | 507 } else if (!nudged) { |
| (...skipping 154 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 636 break; | 662 break; |
| 637 } | 663 } |
| 638 | 664 |
| 639 if (NULL != vault_.syncer_) { | 665 if (NULL != vault_.syncer_) { |
| 640 vault_.syncer_->set_notifications_enabled( | 666 vault_.syncer_->set_notifications_enabled( |
| 641 p2p_authenticated_ && p2p_subscribed_); | 667 p2p_authenticated_ && p2p_subscribed_); |
| 642 } | 668 } |
| 643 } | 669 } |
| 644 | 670 |
| 645 } // namespace browser_sync | 671 } // namespace browser_sync |
| OLD | NEW |