OLD | NEW |
1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 #include "chrome/browser/sync/engine/syncer_thread.h" | 4 #include "chrome/browser/sync/engine/syncer_thread.h" |
5 | 5 |
6 #include "build/build_config.h" | 6 #include "build/build_config.h" |
7 | 7 |
8 #ifdef OS_MACOSX | 8 #ifdef OS_MACOSX |
9 #include <CoreFoundation/CFNumber.h> | 9 #include <CoreFoundation/CFNumber.h> |
10 #include <IOKit/IOTypes.h> | 10 #include <IOKit/IOTypes.h> |
(...skipping 108 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
119 return new SyncerThreadTimedStop(command_channel, mgr, connection_manager, | 119 return new SyncerThreadTimedStop(command_channel, mgr, connection_manager, |
120 all_status, model_safe_worker); | 120 all_status, model_safe_worker); |
121 } else { | 121 } else { |
122 // The default SyncerThread implementation, which does not time-out when | 122 // The default SyncerThread implementation, which does not time-out when |
123 // Stop is called. | 123 // Stop is called. |
124 return new SyncerThread(command_channel, mgr, connection_manager, | 124 return new SyncerThread(command_channel, mgr, connection_manager, |
125 all_status, model_safe_worker); | 125 all_status, model_safe_worker); |
126 } | 126 } |
127 } | 127 } |
128 | 128 |
129 bool SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) { | 129 void SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) { |
130 AutoLock lock(lock_); | 130 AutoLock lock(lock_); |
131 if (vault_.syncer_ == NULL) { | 131 if (vault_.syncer_ == NULL) { |
132 return false; | 132 return; |
133 } | 133 } |
| 134 |
| 135 if (vault_.current_wait_interval_.had_nudge_during_backoff) { |
| 136 // Drop nudges on the floor if we've already had one since starting this |
| 137 // stage of exponential backoff. |
| 138 return; |
| 139 } |
| 140 |
134 NudgeSyncImpl(milliseconds_from_now, source); | 141 NudgeSyncImpl(milliseconds_from_now, source); |
135 return true; | |
136 } | 142 } |
137 | 143 |
138 SyncerThread::SyncerThread() | 144 SyncerThread::SyncerThread() |
139 : thread_main_started_(false, false), | 145 : thread_main_started_(false, false), |
140 thread_("SyncEngine_SyncerThread"), | 146 thread_("SyncEngine_SyncerThread"), |
141 vault_field_changed_(&lock_), | 147 vault_field_changed_(&lock_), |
142 p2p_authenticated_(false), | 148 p2p_authenticated_(false), |
143 p2p_subscribed_(false), | 149 p2p_subscribed_(false), |
144 client_command_hookup_(NULL), | 150 client_command_hookup_(NULL), |
145 conn_mgr_hookup_(NULL), | 151 conn_mgr_hookup_(NULL), |
(...skipping 150 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
296 syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval(); | 302 syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval(); |
297 } | 303 } |
298 } | 304 } |
299 | 305 |
300 void SyncerThread::ThreadMainLoop() { | 306 void SyncerThread::ThreadMainLoop() { |
301 // This is called with lock_ acquired. | 307 // This is called with lock_ acquired. |
302 lock_.AssertAcquired(); | 308 lock_.AssertAcquired(); |
303 LOG(INFO) << "In thread main loop."; | 309 LOG(INFO) << "In thread main loop."; |
304 | 310 |
305 // Use the short poll value by default. | 311 // Use the short poll value by default. |
306 TimeDelta poll_seconds = | 312 vault_.current_wait_interval_.poll_delta = |
307 TimeDelta::FromSeconds(syncer_short_poll_interval_seconds_); | 313 TimeDelta::FromSeconds(syncer_short_poll_interval_seconds_); |
308 int user_idle_milliseconds = 0; | 314 int user_idle_milliseconds = 0; |
309 TimeTicks last_sync_time; | 315 TimeTicks last_sync_time; |
310 bool initial_sync_for_thread = true; | 316 bool initial_sync_for_thread = true; |
311 bool continue_sync_cycle = false; | 317 bool continue_sync_cycle = false; |
312 | 318 |
313 while (!vault_.stop_syncer_thread_) { | 319 while (!vault_.stop_syncer_thread_) { |
314 // The Wait()s in these conditionals using |vault_| are not TimedWait()s (as | 320 // The Wait()s in these conditionals using |vault_| are not TimedWait()s (as |
315 // below) because we cannot poll until these conditions are met, so we wait | 321 // below) because we cannot poll until these conditions are met, so we wait |
316 // indefinitely. | 322 // indefinitely. |
317 if (!vault_.connected_) { | 323 if (!vault_.connected_) { |
318 LOG(INFO) << "Syncer thread waiting for connection."; | 324 LOG(INFO) << "Syncer thread waiting for connection."; |
319 while (!vault_.connected_ && !vault_.stop_syncer_thread_) | 325 while (!vault_.connected_ && !vault_.stop_syncer_thread_) |
320 vault_field_changed_.Wait(); | 326 vault_field_changed_.Wait(); |
321 LOG_IF(INFO, vault_.connected_) << "Syncer thread found connection."; | 327 LOG_IF(INFO, vault_.connected_) << "Syncer thread found connection."; |
322 continue; | 328 continue; |
323 } | 329 } |
324 | 330 |
325 if (vault_.syncer_ == NULL) { | 331 if (vault_.syncer_ == NULL) { |
326 LOG(INFO) << "Syncer thread waiting for database initialization."; | 332 LOG(INFO) << "Syncer thread waiting for database initialization."; |
327 while (vault_.syncer_ == NULL && !vault_.stop_syncer_thread_) | 333 while (vault_.syncer_ == NULL && !vault_.stop_syncer_thread_) |
328 vault_field_changed_.Wait(); | 334 vault_field_changed_.Wait(); |
329 LOG_IF(INFO, !(vault_.syncer_ == NULL)) | 335 LOG_IF(INFO, !(vault_.syncer_ == NULL)) |
330 << "Syncer was found after DB started."; | 336 << "Syncer was found after DB started."; |
331 continue; | 337 continue; |
332 } | 338 } |
333 | 339 |
334 const TimeTicks next_poll = last_sync_time + poll_seconds; | 340 const TimeTicks next_poll = last_sync_time + |
| 341 vault_.current_wait_interval_.poll_delta; |
335 const TimeTicks end_wait = | 342 const TimeTicks end_wait = |
336 !vault_.nudge_queue_.empty() && | 343 !vault_.nudge_queue_.empty() && |
337 vault_.nudge_queue_.top().first < next_poll ? | 344 vault_.nudge_queue_.top().first < next_poll ? |
338 vault_.nudge_queue_.top().first : next_poll; | 345 vault_.nudge_queue_.top().first : next_poll; |
339 LOG(INFO) << "end_wait is " << end_wait.ToInternalValue(); | 346 LOG(INFO) << "end_wait is " << end_wait.ToInternalValue(); |
340 LOG(INFO) << "next_poll is " << next_poll.ToInternalValue(); | 347 LOG(INFO) << "next_poll is " << next_poll.ToInternalValue(); |
341 | 348 |
342 // We block until the CV is signaled (e.g a control field changed, loss of | 349 // We block until the CV is signaled (e.g a control field changed, loss of |
343 // network connection, nudge, spurious, etc), or the poll interval elapses. | 350 // network connection, nudge, spurious, etc), or the poll interval elapses. |
344 TimeDelta sleep_time = end_wait - TimeTicks::Now(); | 351 TimeDelta sleep_time = end_wait - TimeTicks::Now(); |
345 if (sleep_time > TimeDelta::FromSeconds(0)) { | 352 if (sleep_time > TimeDelta::FromSeconds(0)) { |
346 vault_field_changed_.TimedWait(sleep_time); | 353 vault_field_changed_.TimedWait(sleep_time); |
347 | 354 |
348 if (TimeTicks::Now() < end_wait) { | 355 if (TimeTicks::Now() < end_wait) { |
349 // Didn't timeout. Could be a spurious signal, or a signal corresponding | 356 // Didn't timeout. Could be a spurious signal, or a signal corresponding |
350 // to an actual change in one of our control fields. By continuing here | 357 // to an actual change in one of our control fields. By continuing here |
351 // we perform the typical "always recheck conditions when signaled", | 358 // we perform the typical "always recheck conditions when signaled", |
352 // (typically handled by a while(condition_not_met) cv.wait() construct) | 359 // (typically handled by a while(condition_not_met) cv.wait() construct) |
353 // because we jump to the top of the loop. The main difference is we | 360 // because we jump to the top of the loop. The main difference is we |
354 // recalculate the wait interval, but last_sync_time won't have changed. | 361 // recalculate the wait interval, but last_sync_time won't have changed. |
355 // So if we were signaled by a nudge (for ex.) we'll grab the new nudge | 362 // So if we were signaled by a nudge (for ex.) we'll grab the new nudge |
356 // off the queue and wait for that delta. If it was a spurious signal, | 363 // off the queue and wait for that delta. If it was a spurious signal, |
357 // we'll keep waiting for the same moment in time as we just were. | 364 // we'll keep waiting for the same moment in time as we just were. |
358 continue; | 365 continue; |
359 } | 366 } |
360 } | 367 } |
361 | 368 |
362 // Handle a nudge, caused by either a notification or a local bookmark | 369 // Handle a nudge, caused by either a notification or a local bookmark |
363 // event. This will also update the source of the following SyncMain call. | 370 // event. This will also update the source of the following SyncMain call. |
364 UpdateNudgeSource(&continue_sync_cycle, &initial_sync_for_thread); | 371 bool nudged = UpdateNudgeSource(continue_sync_cycle, |
| 372 &initial_sync_for_thread); |
365 | 373 |
366 LOG(INFO) << "Calling Sync Main at time " << Time::Now().ToInternalValue(); | 374 LOG(INFO) << "Calling Sync Main at time " << Time::Now().ToInternalValue(); |
367 SyncMain(vault_.syncer_); | 375 SyncMain(vault_.syncer_); |
368 last_sync_time = TimeTicks::Now(); | 376 last_sync_time = TimeTicks::Now(); |
369 | 377 |
370 LOG(INFO) << "Updating the next polling time after SyncMain"; | 378 LOG(INFO) << "Updating the next polling time after SyncMain"; |
371 poll_seconds = TimeDelta::FromSeconds(CalculatePollingWaitTime( | 379 vault_.current_wait_interval_ = CalculatePollingWaitTime( |
372 allstatus_->status(), static_cast<int>(poll_seconds.InSeconds()), | 380 allstatus_->status(), |
373 &user_idle_milliseconds, &continue_sync_cycle)); | 381 static_cast<int>(vault_.current_wait_interval_.poll_delta.InSeconds()), |
| 382 &user_idle_milliseconds, &continue_sync_cycle, nudged); |
374 } | 383 } |
375 | |
376 } | 384 } |
377 | 385 |
378 // We check how long the user's been idle and sync less often if the machine is | 386 // We check how long the user's been idle and sync less often if the machine is |
379 // not in use. The aim is to reduce server load. | 387 // not in use. The aim is to reduce server load. |
380 // TODO(timsteele): Should use Time(Delta). | 388 // TODO(timsteele): Should use Time(Delta). |
381 int SyncerThread::CalculatePollingWaitTime( | 389 SyncerThread::WaitInterval SyncerThread::CalculatePollingWaitTime( |
382 const AllStatus::Status& status, | 390 const AllStatus::Status& status, |
383 int last_poll_wait, // Time in seconds. | 391 int last_poll_wait, // Time in seconds. |
384 int* user_idle_milliseconds, | 392 int* user_idle_milliseconds, |
385 bool* continue_sync_cycle) { | 393 bool* continue_sync_cycle, |
| 394 bool was_nudged) { |
| 395 lock_.AssertAcquired(); // We access 'vault' in here, so we need the lock. |
| 396 WaitInterval return_interval; |
386 bool is_continuing_sync_cyle = *continue_sync_cycle; | 397 bool is_continuing_sync_cyle = *continue_sync_cycle; |
387 *continue_sync_cycle = false; | 398 *continue_sync_cycle = false; |
388 | 399 |
389 // Determine if the syncer has unfinished work to do from allstatus_. | 400 // Determine if the syncer has unfinished work to do from allstatus_. |
390 const bool syncer_has_work_to_do = | 401 const bool syncer_has_work_to_do = |
391 status.updates_available > status.updates_received | 402 status.updates_available > status.updates_received |
392 || status.unsynced_count > 0; | 403 || status.unsynced_count > 0; |
393 LOG(INFO) << "syncer_has_work_to_do is " << syncer_has_work_to_do; | 404 LOG(INFO) << "syncer_has_work_to_do is " << syncer_has_work_to_do; |
394 | 405 |
395 // First calculate the expected wait time, figuring in any backoff because of | 406 // First calculate the expected wait time, figuring in any backoff because of |
396 // user idle time. next_wait is in seconds | 407 // user idle time. next_wait is in seconds |
397 syncer_polling_interval_ = (!status.notifications_enabled) ? | 408 syncer_polling_interval_ = (!status.notifications_enabled) ? |
398 syncer_short_poll_interval_seconds_ : | 409 syncer_short_poll_interval_seconds_ : |
399 syncer_long_poll_interval_seconds_; | 410 syncer_long_poll_interval_seconds_; |
400 int default_next_wait = syncer_polling_interval_; | 411 int default_next_wait = syncer_polling_interval_; |
401 int actual_next_wait = default_next_wait; | 412 return_interval.poll_delta = TimeDelta::FromSeconds(default_next_wait); |
402 | 413 |
403 if (syncer_has_work_to_do) { | 414 if (syncer_has_work_to_do) { |
404 // Provide exponential backoff due to consecutive errors, else attempt to | 415 // Provide exponential backoff due to consecutive errors, else attempt to |
405 // complete the work as soon as possible. | 416 // complete the work as soon as possible. |
406 if (!is_continuing_sync_cyle) { | 417 if (is_continuing_sync_cyle) { |
407 actual_next_wait = AllStatus::GetRecommendedDelaySeconds(0); | 418 return_interval.mode = WaitInterval::EXPONENTIAL_BACKOFF; |
| 419 if (was_nudged && vault_.current_wait_interval_.mode == |
| 420 WaitInterval::EXPONENTIAL_BACKOFF) { |
| 421 // We were nudged, it failed, and we were already in backoff. |
| 422 return_interval.had_nudge_during_backoff = true; |
| 423 // Keep exponent for exponential backoff the same in this case. |
| 424 return_interval.poll_delta = vault_.current_wait_interval_.poll_delta; |
| 425 } else { |
| 426 // We weren't nudged, or we were in a NORMAL wait interval until now. |
| 427 return_interval.poll_delta = TimeDelta::FromSeconds( |
| 428 AllStatus::GetRecommendedDelaySeconds(last_poll_wait)); |
| 429 } |
408 } else { | 430 } else { |
409 actual_next_wait = AllStatus::GetRecommendedDelaySeconds(last_poll_wait); | 431 // No consecutive error. |
| 432 return_interval.poll_delta = TimeDelta::FromSeconds( |
| 433 AllStatus::GetRecommendedDelaySeconds(0)); |
410 } | 434 } |
411 *continue_sync_cycle = true; | 435 *continue_sync_cycle = true; |
412 } else if (!status.notifications_enabled) { | 436 } else if (!status.notifications_enabled) { |
413 // Ensure that we start exponential backoff from our base polling | 437 // Ensure that we start exponential backoff from our base polling |
414 // interval when we are not continuing a sync cycle. | 438 // interval when we are not continuing a sync cycle. |
415 last_poll_wait = std::max(last_poll_wait, syncer_polling_interval_); | 439 last_poll_wait = std::max(last_poll_wait, syncer_polling_interval_); |
416 | 440 |
417 // Did the user start interacting with the computer again? | 441 // Did the user start interacting with the computer again? |
418 // If so, revise our idle time (and probably next_sync_time) downwards | 442 // If so, revise our idle time (and probably next_sync_time) downwards |
419 int new_idle_time = disable_idle_detection_ ? 0 : UserIdleTime(); | 443 int new_idle_time = disable_idle_detection_ ? 0 : UserIdleTime(); |
420 if (new_idle_time < *user_idle_milliseconds) { | 444 if (new_idle_time < *user_idle_milliseconds) { |
421 *user_idle_milliseconds = new_idle_time; | 445 *user_idle_milliseconds = new_idle_time; |
422 } | 446 } |
423 actual_next_wait = CalculateSyncWaitTime(last_poll_wait * 1000, | 447 return_interval.poll_delta = TimeDelta::FromMilliseconds( |
424 *user_idle_milliseconds) / 1000; | 448 CalculateSyncWaitTime(last_poll_wait * 1000, |
425 DCHECK_GE(actual_next_wait, default_next_wait); | 449 *user_idle_milliseconds)); |
| 450 DCHECK_GE(return_interval.poll_delta.InSeconds(), default_next_wait); |
426 } | 451 } |
427 | 452 |
428 LOG(INFO) << "Sync wait: idle " << default_next_wait | 453 LOG(INFO) << "Sync wait: idle " << default_next_wait |
429 << " non-idle or backoff " << actual_next_wait << "."; | 454 << " non-idle or backoff " |
| 455 << return_interval.poll_delta.InSeconds() << "."; |
430 | 456 |
431 return actual_next_wait; | 457 return return_interval; |
432 } | 458 } |
433 | 459 |
434 void SyncerThread::ThreadMain() { | 460 void SyncerThread::ThreadMain() { |
435 AutoLock lock(lock_); | 461 AutoLock lock(lock_); |
436 // Signal Start() to let it know we've made it safely onto the message loop, | 462 // Signal Start() to let it know we've made it safely onto the message loop, |
437 // and unblock it's caller. | 463 // and unblock it's caller. |
438 thread_main_started_.Signal(); | 464 thread_main_started_.Signal(); |
439 ThreadMainLoop(); | 465 ThreadMainLoop(); |
440 LOG(INFO) << "Syncer thread ThreadMain is done."; | 466 LOG(INFO) << "Syncer thread ThreadMain is done."; |
441 } | 467 } |
442 | 468 |
443 void SyncerThread::SyncMain(Syncer* syncer) { | 469 void SyncerThread::SyncMain(Syncer* syncer) { |
444 CHECK(syncer); | 470 CHECK(syncer); |
445 AutoUnlock unlock(lock_); | 471 AutoUnlock unlock(lock_); |
446 while (syncer->SyncShare()) { | 472 while (syncer->SyncShare()) { |
447 LOG(INFO) << "Looping in sync share"; | 473 LOG(INFO) << "Looping in sync share"; |
448 } | 474 } |
449 LOG(INFO) << "Done looping in sync share"; | 475 LOG(INFO) << "Done looping in sync share"; |
450 } | 476 } |
451 | 477 |
452 void SyncerThread::UpdateNudgeSource(bool* continue_sync_cycle, | 478 bool SyncerThread::UpdateNudgeSource(bool continue_sync_cycle, |
453 bool* initial_sync) { | 479 bool* initial_sync) { |
454 bool nudged = false; | 480 bool nudged = false; |
455 NudgeSource nudge_source = kUnknown; | 481 NudgeSource nudge_source = kUnknown; |
456 // Has the previous sync cycle completed? | 482 // Has the previous sync cycle completed? |
457 if (continue_sync_cycle) { | 483 if (continue_sync_cycle) { |
458 nudge_source = kContinuation; | 484 nudge_source = kContinuation; |
459 } | 485 } |
460 // Update the nudge source if a new nudge has come through during the | 486 // Update the nudge source if a new nudge has come through during the |
461 // previous sync cycle. | 487 // previous sync cycle. |
462 while (!vault_.nudge_queue_.empty() && | 488 while (!vault_.nudge_queue_.empty() && |
463 TimeTicks::Now() >= vault_.nudge_queue_.top().first) { | 489 TimeTicks::Now() >= vault_.nudge_queue_.top().first) { |
464 if (!nudged) { | 490 if (!nudged) { |
465 nudge_source = vault_.nudge_queue_.top().second; | 491 nudge_source = vault_.nudge_queue_.top().second; |
466 *continue_sync_cycle = false; // Reset the continuation token on nudge. | |
467 nudged = true; | 492 nudged = true; |
468 } | 493 } |
469 vault_.nudge_queue_.pop(); | 494 vault_.nudge_queue_.pop(); |
470 } | 495 } |
471 SetUpdatesSource(nudged, nudge_source, initial_sync); | 496 SetUpdatesSource(nudged, nudge_source, initial_sync); |
| 497 return nudged; |
472 } | 498 } |
473 | 499 |
474 void SyncerThread::SetUpdatesSource(bool nudged, NudgeSource nudge_source, | 500 void SyncerThread::SetUpdatesSource(bool nudged, NudgeSource nudge_source, |
475 bool* initial_sync) { | 501 bool* initial_sync) { |
476 sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source = | 502 sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source = |
477 sync_pb::GetUpdatesCallerInfo::UNKNOWN; | 503 sync_pb::GetUpdatesCallerInfo::UNKNOWN; |
478 if (*initial_sync) { | 504 if (*initial_sync) { |
479 updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE; | 505 updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE; |
480 *initial_sync = false; | 506 *initial_sync = false; |
481 } else if (!nudged) { | 507 } else if (!nudged) { |
(...skipping 154 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
636 break; | 662 break; |
637 } | 663 } |
638 | 664 |
639 if (NULL != vault_.syncer_) { | 665 if (NULL != vault_.syncer_) { |
640 vault_.syncer_->set_notifications_enabled( | 666 vault_.syncer_->set_notifications_enabled( |
641 p2p_authenticated_ && p2p_subscribed_); | 667 p2p_authenticated_ && p2p_subscribed_); |
642 } | 668 } |
643 } | 669 } |
644 | 670 |
645 } // namespace browser_sync | 671 } // namespace browser_sync |
OLD | NEW |