| Index: chrome/browser/sync/engine/syncer_thread_timed_stop.cc
|
| ===================================================================
|
| --- chrome/browser/sync/engine/syncer_thread_timed_stop.cc (revision 26372)
|
| +++ chrome/browser/sync/engine/syncer_thread_timed_stop.cc (working copy)
|
| @@ -1,9 +1,8 @@
|
| // Copyright (c) 2009 The Chromium Authors. All rights reserved.
|
| // Use of this source code is governed by a BSD-style license that can be
|
| // found in the LICENSE file.
|
| +#include "chrome/browser/sync/engine/syncer_thread_timed_stop.h"
|
|
|
| -#include "chrome/browser/sync/engine/syncer_thread.h"
|
| -
|
| #include "build/build_config.h"
|
|
|
| #ifdef OS_MACOSX
|
| @@ -26,534 +25,95 @@
|
|
|
| using std::priority_queue;
|
| using std::min;
|
| +using base::Time;
|
| +using base::TimeDelta;
|
| +using base::TimeTicks;
|
|
|
| -static inline bool operator < (const timespec& a, const timespec& b) {
|
| - return a.tv_sec == b.tv_sec ? a.tv_nsec < b.tv_nsec : a.tv_sec < b.tv_sec;
|
| -}
|
| -
|
| -namespace {
|
| -
|
| -// Returns the amount of time since the user last interacted with the computer,
|
| -// in milliseconds
|
| -int UserIdleTime() {
|
| -#ifdef OS_WIN
|
| - LASTINPUTINFO last_input_info;
|
| - last_input_info.cbSize = sizeof(LASTINPUTINFO);
|
| -
|
| - // Get time in windows ticks since system start of last activity.
|
| - BOOL b = ::GetLastInputInfo(&last_input_info);
|
| - if (b == TRUE)
|
| - return ::GetTickCount() - last_input_info.dwTime;
|
| -#elif defined(OS_MACOSX)
|
| - // It would be great to do something like:
|
| - //
|
| - // return 1000 *
|
| - // CGEventSourceSecondsSinceLastEventType(
|
| - // kCGEventSourceStateCombinedSessionState,
|
| - // kCGAnyInputEventType);
|
| - //
|
| - // Unfortunately, CGEvent* lives in ApplicationServices, and we're a daemon
|
| - // and can't link that high up the food chain. Thus this mucking in IOKit.
|
| -
|
| - io_service_t hid_service =
|
| - IOServiceGetMatchingService(kIOMasterPortDefault,
|
| - IOServiceMatching("IOHIDSystem"));
|
| - if (!hid_service) {
|
| - LOG(WARNING) << "Could not obtain IOHIDSystem";
|
| - return 0;
|
| - }
|
| -
|
| - CFTypeRef object = IORegistryEntryCreateCFProperty(hid_service,
|
| - CFSTR("HIDIdleTime"),
|
| - kCFAllocatorDefault,
|
| - 0);
|
| - if (!object) {
|
| - LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property";
|
| - IOObjectRelease(hid_service);
|
| - return 0;
|
| - }
|
| -
|
| - int64 idle_time; // in nanoseconds
|
| - Boolean success;
|
| - if (CFGetTypeID(object) == CFNumberGetTypeID()) {
|
| - success = CFNumberGetValue((CFNumberRef)object,
|
| - kCFNumberSInt64Type,
|
| - &idle_time);
|
| - } else {
|
| - LOG(WARNING) << "IOHIDSystem's HIDIdleTime property isn't a number!";
|
| - }
|
| -
|
| - CFRelease(object);
|
| - IOObjectRelease(hid_service);
|
| -
|
| - if (!success) {
|
| - LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property's value";
|
| - return 0;
|
| - } else {
|
| - return idle_time / 1000000; // nano to milli
|
| - }
|
| -#else
|
| - static bool was_logged = false;
|
| - if (!was_logged) {
|
| - was_logged = true;
|
| - LOG(INFO) << "UserIdleTime unimplemented on this platform, "
|
| - "synchronization will not throttle when user idle";
|
| - }
|
| -#endif
|
| -
|
| - return 0;
|
| -}
|
| -
|
| -} // namespace
|
| -
|
| namespace browser_sync {
|
|
|
| -bool SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) {
|
| - MutexLock lock(&mutex_);
|
| - if (syncer_ == NULL) {
|
| - return false;
|
| - }
|
| - NudgeSyncImpl(milliseconds_from_now, source);
|
| - return true;
|
| -}
|
| -
|
| -void* RunSyncerThread(void* syncer_thread) {
|
| - return (reinterpret_cast<SyncerThread*>(syncer_thread))->ThreadMain();
|
| -}
|
| -
|
| -SyncerThread::SyncerThread(
|
| +SyncerThreadTimedStop::SyncerThreadTimedStop(
|
| ClientCommandChannel* command_channel,
|
| syncable::DirectoryManager* mgr,
|
| ServerConnectionManager* connection_manager,
|
| AllStatus* all_status,
|
| ModelSafeWorker* model_safe_worker)
|
| - : dirman_(mgr), scm_(connection_manager),
|
| - syncer_(NULL), syncer_events_(NULL), thread_running_(false),
|
| - syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds),
|
| - syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds),
|
| - syncer_polling_interval_(kDefaultShortPollIntervalSeconds),
|
| - syncer_max_interval_(kDefaultMaxPollIntervalMs),
|
| - stop_syncer_thread_(false), connected_(false), conn_mgr_hookup_(NULL),
|
| - p2p_authenticated_(false), p2p_subscribed_(false),
|
| - allstatus_(all_status), talk_mediator_hookup_(NULL),
|
| - command_channel_(command_channel), directory_manager_hookup_(NULL),
|
| - model_safe_worker_(model_safe_worker),
|
| - client_command_hookup_(NULL), disable_idle_detection_(false) {
|
| -
|
| - SyncerEvent shutdown = { SyncerEvent::SHUTDOWN_USE_WITH_CARE };
|
| - syncer_event_channel_.reset(new SyncerEventChannel(shutdown));
|
| -
|
| - if (dirman_) {
|
| - directory_manager_hookup_.reset(NewEventListenerHookup(
|
| - dirman_->channel(), this, &SyncerThread::HandleDirectoryManagerEvent));
|
| - }
|
| -
|
| - if (scm_) {
|
| - WatchConnectionManager(scm_);
|
| - }
|
| -
|
| - if (command_channel_) {
|
| - WatchClientCommands(command_channel_);
|
| - }
|
| + : SyncerThread(command_channel, mgr, connection_manager, all_status,
|
| + model_safe_worker),
|
| + in_thread_main_loop_(false) {
|
| }
|
|
|
| -SyncerThread::~SyncerThread() {
|
| - client_command_hookup_.reset();
|
| - conn_mgr_hookup_.reset();
|
| - syncer_event_channel_.reset();
|
| - directory_manager_hookup_.reset();
|
| - syncer_events_.reset();
|
| - delete syncer_;
|
| - talk_mediator_hookup_.reset();
|
| - CHECK(!thread_running_);
|
| -}
|
| -
|
| -// Creates and starts a syncer thread.
|
| -// Returns true if it creates a thread or if there's currently a thread running
|
| -// and false otherwise.
|
| -bool SyncerThread::Start() {
|
| - MutexLock lock(&mutex_);
|
| - if (thread_running_) {
|
| - return true;
|
| - }
|
| - thread_running_ =
|
| - (0 == pthread_create(&thread_, NULL, RunSyncerThread, this));
|
| - if (thread_running_) {
|
| - pthread_detach(thread_);
|
| - }
|
| - return thread_running_;
|
| -}
|
| -
|
| // Stop processing. A max wait of at least 2*server RTT time is recommended.
|
| // Returns true if we stopped, false otherwise.
|
| -bool SyncerThread::Stop(int max_wait) {
|
| - MutexLock lock(&mutex_);
|
| - if (!thread_running_)
|
| +bool SyncerThreadTimedStop::Stop(int max_wait) {
|
| + AutoLock lock(lock_);
|
| + // If the thread has been started, then we either already have or are about to
|
| + // enter ThreadMainLoop so we have to proceed with shutdown and wait for it to
|
| + // finish. If the thread has not been started --and we now own the lock--
|
| + // then we can early out because the caller has not called Start().
|
| + if (!thread_.IsRunning())
|
| return true;
|
| - stop_syncer_thread_ = true;
|
| - if (NULL != syncer_) {
|
| +
|
| + LOG(INFO) << "SyncerThread::Stop - setting ThreadMain exit condition to "
|
| + << "true (vault_.stop_syncer_thread_)";
|
| + // Exit the ThreadMainLoop once the syncer finishes (we tell it to exit
|
| + // below).
|
| + vault_.stop_syncer_thread_ = true;
|
| + if (NULL != vault_.syncer_) {
|
| // Try to early exit the syncer.
|
| - syncer_->RequestEarlyExit();
|
| + vault_.syncer_->RequestEarlyExit();
|
| }
|
| - pthread_cond_broadcast(&changed_.condvar_);
|
| - timespec deadline = { time(NULL) + (max_wait / 1000), 0 };
|
| - do {
|
| - const int wait_result = max_wait < 0 ?
|
| - pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_) :
|
| - pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_,
|
| - &deadline);
|
| - if (ETIMEDOUT == wait_result) {
|
| - LOG(ERROR) << "SyncerThread::Stop timed out. Problems likely.";
|
| - return false;
|
| - }
|
| - } while (thread_running_);
|
| - return true;
|
| -}
|
|
|
| -void SyncerThread::WatchClientCommands(ClientCommandChannel* channel) {
|
| - PThreadScopedLock<PThreadMutex> lock(&mutex_);
|
| - client_command_hookup_.reset(NewEventListenerHookup(channel, this,
|
| - &SyncerThread::HandleClientCommand));
|
| -}
|
| -
|
| -void SyncerThread::HandleClientCommand(ClientCommandChannel::EventType event) {
|
| - if (!event) {
|
| - return;
|
| - }
|
| -
|
| - // Mutex not really necessary for these.
|
| - if (event->has_set_sync_poll_interval()) {
|
| - syncer_short_poll_interval_seconds_ = event->set_sync_poll_interval();
|
| - }
|
| -
|
| - if (event->has_set_sync_long_poll_interval()) {
|
| - syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval();
|
| - }
|
| -}
|
| -
|
| -void SyncerThread::ThreadMainLoop() {
|
| - // Use the short poll value by default.
|
| - int poll_seconds = syncer_short_poll_interval_seconds_;
|
| - int user_idle_milliseconds = 0;
|
| - timespec last_sync_time = { 0 };
|
| - bool initial_sync_for_thread = true;
|
| - bool continue_sync_cycle = false;
|
| -
|
| - while (!stop_syncer_thread_) {
|
| - if (!connected_) {
|
| - LOG(INFO) << "Syncer thread waiting for connection.";
|
| - while (!connected_ && !stop_syncer_thread_)
|
| - pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_);
|
| - LOG_IF(INFO, connected_) << "Syncer thread found connection.";
|
| - continue;
|
| + // stop_syncer_thread_ is now true and the Syncer has been told to exit.
|
| + // We want to wake up all waiters so they can re-examine state. We signal,
|
| + // causing all waiters to try to re-acquire the lock, and then we atomically
|
| + // release the lock and wait. Our wait can be spuriously signaled, so we
|
| + // recalculate the remaining sleep time each time through and re-
|
| + // check the condition before exiting the loop.
|
| + vault_field_changed_.Broadcast();
|
| + TimeTicks start = TimeTicks::Now();
|
| + TimeTicks end = start + TimeDelta::FromMilliseconds(max_wait);
|
| + bool timed_out = false;
|
| + // Eventually the combination of RequestEarlyExit and setting
|
| + // stop_syncer_thread_ to true above will cause in_thread_main_loop_ to become
|
| + // false.
|
| + while (in_thread_main_loop_) {
|
| + TimeDelta sleep_time = end - TimeTicks::Now();
|
| + if (sleep_time < TimeDelta::FromSeconds(0)) {
|
| + timed_out = true;
|
| + break;
|
| }
|
| -
|
| - if (syncer_ == NULL) {
|
| - LOG(INFO) << "Syncer thread waiting for database initialization.";
|
| - while (syncer_ == NULL && !stop_syncer_thread_)
|
| - pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_);
|
| - LOG_IF(INFO, !(syncer_ == NULL)) << "Syncer was found after DB started.";
|
| - continue;
|
| - }
|
| -
|
| - timespec const next_poll = { last_sync_time.tv_sec + poll_seconds,
|
| - last_sync_time.tv_nsec };
|
| - const timespec wake_time =
|
| - !nudge_queue_.empty() && nudge_queue_.top().first < next_poll ?
|
| - nudge_queue_.top().first : next_poll;
|
| - LOG(INFO) << "wake time is " << wake_time.tv_sec;
|
| - LOG(INFO) << "next poll is " << next_poll.tv_sec;
|
| -
|
| - const int error = pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_,
|
| - &wake_time);
|
| - if (ETIMEDOUT != error) {
|
| - continue; // Check all the conditions again.
|
| - }
|
| -
|
| - const timespec now = GetPThreadAbsoluteTime(0);
|
| -
|
| - // Handle a nudge, caused by either a notification or a local bookmark
|
| - // event. This will also update the source of the following SyncMain call.
|
| - UpdateNudgeSource(now, &continue_sync_cycle, &initial_sync_for_thread);
|
| -
|
| - LOG(INFO) << "Calling Sync Main at time " << now.tv_sec;
|
| - SyncMain(syncer_);
|
| - last_sync_time = now;
|
| -
|
| - LOG(INFO) << "Updating the next polling time after SyncMain";
|
| - poll_seconds = CalculatePollingWaitTime(allstatus_->status(),
|
| - poll_seconds,
|
| - &user_idle_milliseconds,
|
| - &continue_sync_cycle);
|
| + LOG(INFO) << "Waiting in stop for " << sleep_time.InSeconds() << "s.";
|
| + vault_field_changed_.TimedWait(sleep_time);
|
| }
|
| -}
|
|
|
| -// We check how long the user's been idle and sync less often if the machine is
|
| -// not in use. The aim is to reduce server load.
|
| -int SyncerThread::CalculatePollingWaitTime(
|
| - const AllStatus::Status& status,
|
| - int last_poll_wait, // in s
|
| - int* user_idle_milliseconds,
|
| - bool* continue_sync_cycle) {
|
| - bool is_continuing_sync_cyle = *continue_sync_cycle;
|
| - *continue_sync_cycle = false;
|
| -
|
| - // Determine if the syncer has unfinished work to do from allstatus_.
|
| - const bool syncer_has_work_to_do =
|
| - status.updates_available > status.updates_received
|
| - || status.unsynced_count > 0;
|
| - LOG(INFO) << "syncer_has_work_to_do is " << syncer_has_work_to_do;
|
| -
|
| - // First calculate the expected wait time, figuring in any backoff because of
|
| - // user idle time. next_wait is in seconds
|
| - syncer_polling_interval_ = (!status.notifications_enabled) ?
|
| - syncer_short_poll_interval_seconds_ :
|
| - syncer_long_poll_interval_seconds_;
|
| - int default_next_wait = syncer_polling_interval_;
|
| - int actual_next_wait = default_next_wait;
|
| -
|
| - if (syncer_has_work_to_do) {
|
| - // Provide exponential backoff due to consecutive errors, else attempt to
|
| - // complete the work as soon as possible.
|
| - if (!is_continuing_sync_cyle) {
|
| - actual_next_wait = AllStatus::GetRecommendedDelaySeconds(0);
|
| - } else {
|
| - actual_next_wait = AllStatus::GetRecommendedDelaySeconds(last_poll_wait);
|
| - }
|
| - *continue_sync_cycle = true;
|
| - } else if (!status.notifications_enabled) {
|
| - // Ensure that we start exponential backoff from our base polling
|
| - // interval when we are not continuing a sync cycle.
|
| - last_poll_wait = std::max(last_poll_wait, syncer_polling_interval_);
|
| -
|
| - // Did the user start interacting with the computer again?
|
| - // If so, revise our idle time (and probably next_sync_time) downwards
|
| - int new_idle_time = disable_idle_detection_ ? 0 : UserIdleTime();
|
| - if (new_idle_time < *user_idle_milliseconds) {
|
| - *user_idle_milliseconds = new_idle_time;
|
| - }
|
| - actual_next_wait = CalculateSyncWaitTime(last_poll_wait * 1000,
|
| - *user_idle_milliseconds) / 1000;
|
| - DCHECK_GE(actual_next_wait, default_next_wait);
|
| + if (timed_out) {
|
| + LOG(ERROR) << "SyncerThread::Stop timed out or error. Problems likely.";
|
| + return false;
|
| }
|
|
|
| - LOG(INFO) << "Sync wait: idle " << default_next_wait
|
| - << " non-idle or backoff " << actual_next_wait << ".";
|
| -
|
| - return actual_next_wait;
|
| + // Stop() should not block on anything at this point, given above madness.
|
| + DLOG(INFO) << "Calling SyncerThread::thread_.Stop() at "
|
| + << Time::Now().ToInternalValue();
|
| + thread_.Stop();
|
| + DLOG(INFO) << "SyncerThread::thread_.Stop() finished at "
|
| + << Time::Now().ToInternalValue();
|
| + return true;
|
| }
|
|
|
| -void* SyncerThread::ThreadMain() {
|
| - NameCurrentThreadForDebugging("SyncEngine_SyncerThread");
|
| - mutex_.Lock();
|
| +void SyncerThreadTimedStop::ThreadMain() {
|
| + AutoLock lock(lock_);
|
| + // Signal Start() to let it know we've made it safely are now running on the
|
| + // message loop, and unblock it's caller.
|
| + thread_main_started_.Signal();
|
| +
|
| + // The only thing that could be waiting on this value is Stop, and we don't
|
| + // release the lock until we're far enough along to Stop safely.
|
| + in_thread_main_loop_ = true;
|
| + vault_field_changed_.Broadcast();
|
| ThreadMainLoop();
|
| - thread_running_ = false;
|
| - pthread_cond_broadcast(&changed_.condvar_);
|
| - mutex_.Unlock();
|
| - LOG(INFO) << "Syncer thread exiting.";
|
| - return 0;
|
| + in_thread_main_loop_ = false;
|
| + vault_field_changed_.Broadcast();
|
| + LOG(INFO) << "Syncer thread ThreadMain is done.";
|
| }
|
|
|
| -void SyncerThread::SyncMain(Syncer* syncer) {
|
| - CHECK(syncer);
|
| - mutex_.Unlock();
|
| - while (syncer->SyncShare()) {
|
| - LOG(INFO) << "Looping in sync share";
|
| - }
|
| - LOG(INFO) << "Done looping in sync share";
|
| -
|
| - mutex_.Lock();
|
| -}
|
| -
|
| -void SyncerThread::UpdateNudgeSource(const timespec& now,
|
| - bool* continue_sync_cycle,
|
| - bool* initial_sync) {
|
| - bool nudged = false;
|
| - NudgeSource nudge_source = kUnknown;
|
| - // Has the previous sync cycle completed?
|
| - if (continue_sync_cycle) {
|
| - nudge_source = kContinuation;
|
| - }
|
| - // Update the nudge source if a new nudge has come through during the
|
| - // previous sync cycle.
|
| - while (!nudge_queue_.empty() && !(now < nudge_queue_.top().first)) {
|
| - if (!nudged) {
|
| - nudge_source = nudge_queue_.top().second;
|
| - *continue_sync_cycle = false; // Reset the continuation token on nudge.
|
| - nudged = true;
|
| - }
|
| - nudge_queue_.pop();
|
| - }
|
| - SetUpdatesSource(nudged, nudge_source, initial_sync);
|
| -}
|
| -
|
| -void SyncerThread::SetUpdatesSource(bool nudged, NudgeSource nudge_source,
|
| - bool* initial_sync) {
|
| - sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source =
|
| - sync_pb::GetUpdatesCallerInfo::UNKNOWN;
|
| - if (*initial_sync) {
|
| - updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE;
|
| - *initial_sync = false;
|
| - } else if (!nudged) {
|
| - updates_source = sync_pb::GetUpdatesCallerInfo::PERIODIC;
|
| - } else {
|
| - switch (nudge_source) {
|
| - case kNotification:
|
| - updates_source = sync_pb::GetUpdatesCallerInfo::NOTIFICATION;
|
| - break;
|
| - case kLocal:
|
| - updates_source = sync_pb::GetUpdatesCallerInfo::LOCAL;
|
| - break;
|
| - case kContinuation:
|
| - updates_source = sync_pb::GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
|
| - break;
|
| - case kUnknown:
|
| - default:
|
| - updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN;
|
| - break;
|
| - }
|
| - }
|
| - syncer_->set_updates_source(updates_source);
|
| -}
|
| -
|
| -void SyncerThread::HandleSyncerEvent(const SyncerEvent& event) {
|
| - MutexLock lock(&mutex_);
|
| - channel()->NotifyListeners(event);
|
| - if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) {
|
| - return;
|
| - }
|
| - NudgeSyncImpl(event.nudge_delay_milliseconds, kUnknown);
|
| -}
|
| -
|
| -void SyncerThread::HandleDirectoryManagerEvent(
|
| - const syncable::DirectoryManagerEvent& event) {
|
| - LOG(INFO) << "Handling a directory manager event";
|
| - if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) {
|
| - MutexLock lock(&mutex_);
|
| - LOG(INFO) << "Syncer starting up for: " << event.dirname;
|
| - // The underlying database structure is ready, and we should create
|
| - // the syncer.
|
| - CHECK(syncer_ == NULL);
|
| - syncer_ =
|
| - new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get());
|
| -
|
| - syncer_->set_command_channel(command_channel_);
|
| - syncer_events_.reset(NewEventListenerHookup(
|
| - syncer_->channel(), this, &SyncerThread::HandleSyncerEvent));
|
| - pthread_cond_broadcast(&changed_.condvar_);
|
| - }
|
| -}
|
| -
|
| -static inline void CheckConnected(bool* connected,
|
| - HttpResponse::ServerConnectionCode code,
|
| - pthread_cond_t* condvar) {
|
| - if (*connected) {
|
| - if (HttpResponse::CONNECTION_UNAVAILABLE == code) {
|
| - *connected = false;
|
| - pthread_cond_broadcast(condvar);
|
| - }
|
| - } else {
|
| - if (HttpResponse::SERVER_CONNECTION_OK == code) {
|
| - *connected = true;
|
| - pthread_cond_broadcast(condvar);
|
| - }
|
| - }
|
| -}
|
| -
|
| -void SyncerThread::WatchConnectionManager(ServerConnectionManager* conn_mgr) {
|
| - conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this,
|
| - &SyncerThread::HandleServerConnectionEvent));
|
| - CheckConnected(&connected_, conn_mgr->server_status(),
|
| - &changed_.condvar_);
|
| -}
|
| -
|
| -void SyncerThread::HandleServerConnectionEvent(
|
| - const ServerConnectionEvent& event) {
|
| - if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) {
|
| - MutexLock lock(&mutex_);
|
| - CheckConnected(&connected_, event.connection_code,
|
| - &changed_.condvar_);
|
| - }
|
| -}
|
| -
|
| -SyncerEventChannel* SyncerThread::channel() {
|
| - return syncer_event_channel_.get();
|
| -}
|
| -
|
| -// Inputs and return value in milliseconds.
|
| -int SyncerThread::CalculateSyncWaitTime(int last_interval, int user_idle_ms) {
|
| - // syncer_polling_interval_ is in seconds
|
| - int syncer_polling_interval_ms = syncer_polling_interval_ * 1000;
|
| -
|
| - // This is our default and lower bound.
|
| - int next_wait = syncer_polling_interval_ms;
|
| -
|
| - // Get idle time, bounded by max wait.
|
| - int idle = min(user_idle_ms, syncer_max_interval_);
|
| -
|
| - // If the user has been idle for a while, we'll start decreasing the poll
|
| - // rate.
|
| - if (idle >= kPollBackoffThresholdMultiplier * syncer_polling_interval_ms) {
|
| - next_wait = std::min(AllStatus::GetRecommendedDelaySeconds(
|
| - last_interval / 1000), syncer_max_interval_ / 1000) * 1000;
|
| - }
|
| -
|
| - return next_wait;
|
| -}
|
| -
|
| -// Called with mutex_ already locked.
|
| -void SyncerThread::NudgeSyncImpl(int milliseconds_from_now,
|
| - NudgeSource source) {
|
| - const timespec nudge_time = GetPThreadAbsoluteTime(milliseconds_from_now);
|
| - NudgeObject nudge_object(nudge_time, source);
|
| - nudge_queue_.push(nudge_object);
|
| - pthread_cond_broadcast(&changed_.condvar_);
|
| -}
|
| -
|
| -void SyncerThread::WatchTalkMediator(TalkMediator* mediator) {
|
| - talk_mediator_hookup_.reset(
|
| - NewEventListenerHookup(
|
| - mediator->channel(),
|
| - this,
|
| - &SyncerThread::HandleTalkMediatorEvent));
|
| -}
|
| -
|
| -void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) {
|
| - MutexLock lock(&mutex_);
|
| - switch (event.what_happened) {
|
| - case TalkMediatorEvent::LOGIN_SUCCEEDED:
|
| - LOG(INFO) << "P2P: Login succeeded.";
|
| - p2p_authenticated_ = true;
|
| - break;
|
| - case TalkMediatorEvent::LOGOUT_SUCCEEDED:
|
| - LOG(INFO) << "P2P: Login succeeded.";
|
| - p2p_authenticated_ = false;
|
| - break;
|
| - case TalkMediatorEvent::SUBSCRIPTIONS_ON:
|
| - LOG(INFO) << "P2P: Subscriptions successfully enabled.";
|
| - p2p_subscribed_ = true;
|
| - if (NULL != syncer_) {
|
| - LOG(INFO) << "Subscriptions on. Nudging syncer for initial push.";
|
| - NudgeSyncImpl(0, kLocal);
|
| - }
|
| - break;
|
| - case TalkMediatorEvent::SUBSCRIPTIONS_OFF:
|
| - LOG(INFO) << "P2P: Subscriptions are not enabled.";
|
| - p2p_subscribed_ = false;
|
| - break;
|
| - case TalkMediatorEvent::NOTIFICATION_RECEIVED:
|
| - LOG(INFO) << "P2P: Updates on server, pushing syncer";
|
| - if (NULL != syncer_) {
|
| - NudgeSyncImpl(0, kNotification);
|
| - }
|
| - break;
|
| - default:
|
| - break;
|
| - }
|
| -
|
| - if (NULL != syncer_) {
|
| - syncer_->set_notifications_enabled(p2p_authenticated_ && p2p_subscribed_);
|
| - }
|
| -}
|
| -
|
| -} // namespace browser_sync
|
| +} // namespace browser_sync
|
|
|