Index: chrome/browser/sync/engine/syncer_thread_pthreads.cc |
=================================================================== |
--- chrome/browser/sync/engine/syncer_thread_pthreads.cc (revision 27123) |
+++ chrome/browser/sync/engine/syncer_thread_pthreads.cc (working copy) |
@@ -1,578 +0,0 @@ |
-// Copyright (c) 2009 The Chromium Authors. All rights reserved. |
-// Use of this source code is governed by a BSD-style license that can be |
-// found in the LICENSE file. |
-#include "chrome/browser/sync/engine/syncer_thread_pthreads.h" |
- |
-#include "build/build_config.h" |
- |
-#ifdef OS_MACOSX |
-#include <CoreFoundation/CFNumber.h> |
-#include <IOKit/IOTypes.h> |
-#include <IOKit/IOKitLib.h> |
-#endif |
- |
-#include <algorithm> |
-#include <map> |
-#include <queue> |
- |
-#include "chrome/browser/sync/engine/auth_watcher.h" |
-#include "chrome/browser/sync/engine/model_safe_worker.h" |
-#include "chrome/browser/sync/engine/net/server_connection_manager.h" |
-#include "chrome/browser/sync/engine/syncer.h" |
-#include "chrome/browser/sync/notifier/listener/talk_mediator.h" |
-#include "chrome/browser/sync/notifier/listener/talk_mediator_impl.h" |
-#include "chrome/browser/sync/syncable/directory_manager.h" |
- |
-using std::priority_queue; |
-using std::min; |
- |
-static inline bool operator < (const timespec& a, const timespec& b) { |
- return a.tv_sec == b.tv_sec ? a.tv_nsec < b.tv_nsec : a.tv_sec < b.tv_sec; |
-} |
- |
-namespace { |
- |
-// Returns the amount of time since the user last interacted with the computer, |
-// in milliseconds |
-int UserIdleTime() { |
-#ifdef OS_WIN |
- LASTINPUTINFO last_input_info; |
- last_input_info.cbSize = sizeof(LASTINPUTINFO); |
- |
- // Get time in windows ticks since system start of last activity. |
- BOOL b = ::GetLastInputInfo(&last_input_info); |
- if (b == TRUE) |
- return ::GetTickCount() - last_input_info.dwTime; |
-#elif defined(OS_MACOSX) |
- // It would be great to do something like: |
- // |
- // return 1000 * |
- // CGEventSourceSecondsSinceLastEventType( |
- // kCGEventSourceStateCombinedSessionState, |
- // kCGAnyInputEventType); |
- // |
- // Unfortunately, CGEvent* lives in ApplicationServices, and we're a daemon |
- // and can't link that high up the food chain. Thus this mucking in IOKit. |
- |
- io_service_t hid_service = |
- IOServiceGetMatchingService(kIOMasterPortDefault, |
- IOServiceMatching("IOHIDSystem")); |
- if (!hid_service) { |
- LOG(WARNING) << "Could not obtain IOHIDSystem"; |
- return 0; |
- } |
- |
- CFTypeRef object = IORegistryEntryCreateCFProperty(hid_service, |
- CFSTR("HIDIdleTime"), |
- kCFAllocatorDefault, |
- 0); |
- if (!object) { |
- LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property"; |
- IOObjectRelease(hid_service); |
- return 0; |
- } |
- |
- int64 idle_time; // in nanoseconds |
- Boolean success; |
- if (CFGetTypeID(object) == CFNumberGetTypeID()) { |
- success = CFNumberGetValue((CFNumberRef)object, |
- kCFNumberSInt64Type, |
- &idle_time); |
- } else { |
- LOG(WARNING) << "IOHIDSystem's HIDIdleTime property isn't a number!"; |
- } |
- |
- CFRelease(object); |
- IOObjectRelease(hid_service); |
- |
- if (!success) { |
- LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property's value"; |
- return 0; |
- } else { |
- return idle_time / 1000000; // nano to milli |
- } |
-#else |
- static bool was_logged = false; |
- if (!was_logged) { |
- was_logged = true; |
- LOG(INFO) << "UserIdleTime unimplemented on this platform, " |
- "synchronization will not throttle when user idle"; |
- } |
-#endif |
- |
- return 0; |
-} |
- |
-} // namespace |
- |
-namespace browser_sync { |
- |
-SyncerThreadPthreads::SyncerThreadPthreads( |
- ClientCommandChannel* command_channel, |
- syncable::DirectoryManager* mgr, |
- ServerConnectionManager* connection_manager, |
- AllStatus* all_status, ModelSafeWorker* model_safe_worker) |
- : SyncerThread() { |
- impl_.reset(new SyncerThreadPthreadImpl(command_channel, mgr, |
- connection_manager, all_status, model_safe_worker)); |
-} |
- |
-bool SyncerThreadPthreadImpl::NudgeSyncer(int milliseconds_from_now, |
- SyncerThread::NudgeSource source) { |
- MutexLock lock(&mutex_); |
- if (syncer_ == NULL) { |
- return false; |
- } |
- NudgeSyncImpl(milliseconds_from_now, source); |
- return true; |
-} |
- |
-void* RunSyncerThread(void* syncer_thread) { |
- return (reinterpret_cast<SyncerThreadPthreadImpl*>( |
- syncer_thread))->ThreadMain(); |
-} |
- |
-SyncerThreadPthreadImpl::SyncerThreadPthreadImpl( |
- ClientCommandChannel* command_channel, |
- syncable::DirectoryManager* mgr, |
- ServerConnectionManager* connection_manager, |
- AllStatus* all_status, |
- ModelSafeWorker* model_safe_worker) |
- : dirman_(mgr), scm_(connection_manager), |
- syncer_(NULL), syncer_events_(NULL), thread_running_(false), |
- syncer_short_poll_interval_seconds_( |
- SyncerThread::kDefaultShortPollIntervalSeconds), |
- syncer_long_poll_interval_seconds_( |
- SyncerThread::kDefaultLongPollIntervalSeconds), |
- syncer_polling_interval_(SyncerThread::kDefaultShortPollIntervalSeconds), |
- syncer_max_interval_(SyncerThread::kDefaultMaxPollIntervalMs), |
- stop_syncer_thread_(false), connected_(false), conn_mgr_hookup_(NULL), |
- p2p_authenticated_(false), p2p_subscribed_(false), |
- allstatus_(all_status), talk_mediator_hookup_(NULL), |
- command_channel_(command_channel), directory_manager_hookup_(NULL), |
- model_safe_worker_(model_safe_worker), |
- client_command_hookup_(NULL), disable_idle_detection_(false) { |
- |
- SyncerEvent shutdown = { SyncerEvent::SHUTDOWN_USE_WITH_CARE }; |
- syncer_event_channel_.reset(new SyncerEventChannel(shutdown)); |
- |
- if (dirman_) { |
- directory_manager_hookup_.reset(NewEventListenerHookup( |
- dirman_->channel(), this, |
- &SyncerThreadPthreadImpl::HandleDirectoryManagerEvent)); |
- } |
- |
- if (scm_) { |
- WatchConnectionManager(scm_); |
- } |
- |
- if (command_channel_) { |
- WatchClientCommands(command_channel_); |
- } |
-} |
- |
-SyncerThreadPthreadImpl::~SyncerThreadPthreadImpl() { |
- client_command_hookup_.reset(); |
- conn_mgr_hookup_.reset(); |
- syncer_event_channel_.reset(); |
- directory_manager_hookup_.reset(); |
- syncer_events_.reset(); |
- delete syncer_; |
- talk_mediator_hookup_.reset(); |
- CHECK(!thread_running_); |
-} |
- |
-// Creates and starts a syncer thread. |
-// Returns true if it creates a thread or if there's currently a thread running |
-// and false otherwise. |
-bool SyncerThreadPthreadImpl::Start() { |
- MutexLock lock(&mutex_); |
- if (thread_running_) { |
- return true; |
- } |
- thread_running_ = |
- (0 == pthread_create(&thread_, NULL, RunSyncerThread, this)); |
- if (thread_running_) { |
- pthread_detach(thread_); |
- } |
- return thread_running_; |
-} |
- |
-// Stop processing. A max wait of at least 2*server RTT time is recommended. |
-// Returns true if we stopped, false otherwise. |
-bool SyncerThreadPthreadImpl::Stop(int max_wait) { |
- MutexLock lock(&mutex_); |
- if (!thread_running_) |
- return true; |
- stop_syncer_thread_ = true; |
- if (NULL != syncer_) { |
- // Try to early exit the syncer. |
- syncer_->RequestEarlyExit(); |
- } |
- pthread_cond_broadcast(&changed_.condvar_); |
- timespec deadline = { time(NULL) + (max_wait / 1000), 0 }; |
- do { |
- const int wait_result = max_wait < 0 ? |
- pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_) : |
- pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_, |
- &deadline); |
- if (ETIMEDOUT == wait_result) { |
- LOG(ERROR) << "SyncerThread::Stop timed out. Problems likely."; |
- return false; |
- } |
- } while (thread_running_); |
- return true; |
-} |
- |
-void SyncerThreadPthreadImpl::WatchClientCommands( |
- ClientCommandChannel* channel) { |
- PThreadScopedLock<PThreadMutex> lock(&mutex_); |
- client_command_hookup_.reset(NewEventListenerHookup(channel, this, |
- &SyncerThreadPthreadImpl::HandleClientCommand)); |
-} |
- |
-void SyncerThreadPthreadImpl::HandleClientCommand( |
- ClientCommandChannel::EventType event) { |
- if (!event) { |
- return; |
- } |
- |
- // Mutex not really necessary for these. |
- if (event->has_set_sync_poll_interval()) { |
- syncer_short_poll_interval_seconds_ = event->set_sync_poll_interval(); |
- } |
- |
- if (event->has_set_sync_long_poll_interval()) { |
- syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval(); |
- } |
-} |
- |
-void SyncerThreadPthreadImpl::ThreadMainLoop() { |
- // Use the short poll value by default. |
- int poll_seconds = syncer_short_poll_interval_seconds_; |
- int user_idle_milliseconds = 0; |
- timespec last_sync_time = { 0 }; |
- bool initial_sync_for_thread = true; |
- bool continue_sync_cycle = false; |
- |
- while (!stop_syncer_thread_) { |
- if (!connected_) { |
- LOG(INFO) << "Syncer thread waiting for connection."; |
- while (!connected_ && !stop_syncer_thread_) |
- pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_); |
- LOG_IF(INFO, connected_) << "Syncer thread found connection."; |
- continue; |
- } |
- |
- if (syncer_ == NULL) { |
- LOG(INFO) << "Syncer thread waiting for database initialization."; |
- while (syncer_ == NULL && !stop_syncer_thread_) |
- pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_); |
- LOG_IF(INFO, !(syncer_ == NULL)) << "Syncer was found after DB started."; |
- continue; |
- } |
- |
- timespec const next_poll = { last_sync_time.tv_sec + poll_seconds, |
- last_sync_time.tv_nsec }; |
- const timespec wake_time = |
- !nudge_queue_.empty() && nudge_queue_.top().first < next_poll ? |
- nudge_queue_.top().first : next_poll; |
- LOG(INFO) << "wake time is " << wake_time.tv_sec; |
- LOG(INFO) << "next poll is " << next_poll.tv_sec; |
- |
- const int error = pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_, |
- &wake_time); |
- if (ETIMEDOUT != error) { |
- continue; // Check all the conditions again. |
- } |
- |
- const timespec now = GetPThreadAbsoluteTime(0); |
- |
- // Handle a nudge, caused by either a notification or a local bookmark |
- // event. This will also update the source of the following SyncMain call. |
- UpdateNudgeSource(now, &continue_sync_cycle, &initial_sync_for_thread); |
- |
- LOG(INFO) << "Calling Sync Main at time " << now.tv_sec; |
- SyncMain(syncer_); |
- last_sync_time = now; |
- |
- LOG(INFO) << "Updating the next polling time after SyncMain"; |
- poll_seconds = CalculatePollingWaitTime(allstatus_->status(), |
- poll_seconds, |
- &user_idle_milliseconds, |
- &continue_sync_cycle); |
- } |
-} |
- |
-// We check how long the user's been idle and sync less often if the machine is |
-// not in use. The aim is to reduce server load. |
-int SyncerThreadPthreadImpl::CalculatePollingWaitTime( |
- const AllStatus::Status& status, |
- int last_poll_wait, // in s |
- int* user_idle_milliseconds, |
- bool* continue_sync_cycle) { |
- bool is_continuing_sync_cyle = *continue_sync_cycle; |
- *continue_sync_cycle = false; |
- |
- // Determine if the syncer has unfinished work to do from allstatus_. |
- const bool syncer_has_work_to_do = |
- status.updates_available > status.updates_received |
- || status.unsynced_count > 0; |
- LOG(INFO) << "syncer_has_work_to_do is " << syncer_has_work_to_do; |
- |
- // First calculate the expected wait time, figuring in any backoff because of |
- // user idle time. next_wait is in seconds |
- syncer_polling_interval_ = (!status.notifications_enabled) ? |
- syncer_short_poll_interval_seconds_ : |
- syncer_long_poll_interval_seconds_; |
- int default_next_wait = syncer_polling_interval_; |
- int actual_next_wait = default_next_wait; |
- |
- if (syncer_has_work_to_do) { |
- // Provide exponential backoff due to consecutive errors, else attempt to |
- // complete the work as soon as possible. |
- if (!is_continuing_sync_cyle) { |
- actual_next_wait = AllStatus::GetRecommendedDelaySeconds(0); |
- } else { |
- actual_next_wait = AllStatus::GetRecommendedDelaySeconds(last_poll_wait); |
- } |
- *continue_sync_cycle = true; |
- } else if (!status.notifications_enabled) { |
- // Ensure that we start exponential backoff from our base polling |
- // interval when we are not continuing a sync cycle. |
- last_poll_wait = std::max(last_poll_wait, syncer_polling_interval_); |
- |
- // Did the user start interacting with the computer again? |
- // If so, revise our idle time (and probably next_sync_time) downwards |
- int new_idle_time = disable_idle_detection_ ? 0 : UserIdleTime(); |
- if (new_idle_time < *user_idle_milliseconds) { |
- *user_idle_milliseconds = new_idle_time; |
- } |
- actual_next_wait = CalculateSyncWaitTime(last_poll_wait * 1000, |
- *user_idle_milliseconds) / 1000; |
- DCHECK_GE(actual_next_wait, default_next_wait); |
- } |
- |
- LOG(INFO) << "Sync wait: idle " << default_next_wait |
- << " non-idle or backoff " << actual_next_wait << "."; |
- |
- return actual_next_wait; |
-} |
- |
-void* SyncerThreadPthreadImpl::ThreadMain() { |
- NameCurrentThreadForDebugging("SyncEngine_SyncerThread"); |
- mutex_.Lock(); |
- ThreadMainLoop(); |
- thread_running_ = false; |
- pthread_cond_broadcast(&changed_.condvar_); |
- mutex_.Unlock(); |
- LOG(INFO) << "Syncer thread exiting."; |
- return 0; |
-} |
- |
-void SyncerThreadPthreadImpl::SyncMain(Syncer* syncer) { |
- CHECK(syncer); |
- mutex_.Unlock(); |
- while (syncer->SyncShare()) { |
- LOG(INFO) << "Looping in sync share"; |
- } |
- LOG(INFO) << "Done looping in sync share"; |
- |
- mutex_.Lock(); |
-} |
- |
-void SyncerThreadPthreadImpl::UpdateNudgeSource(const timespec& now, |
- bool* continue_sync_cycle, |
- bool* initial_sync) { |
- bool nudged = false; |
- SyncerThread::NudgeSource nudge_source = SyncerThread::kUnknown; |
- // Has the previous sync cycle completed? |
- if (continue_sync_cycle) { |
- nudge_source = SyncerThread::kContinuation; |
- } |
- // Update the nudge source if a new nudge has come through during the |
- // previous sync cycle. |
- while (!nudge_queue_.empty() && !(now < nudge_queue_.top().first)) { |
- if (!nudged) { |
- nudge_source = nudge_queue_.top().second; |
- *continue_sync_cycle = false; // Reset the continuation token on nudge. |
- nudged = true; |
- } |
- nudge_queue_.pop(); |
- } |
- SetUpdatesSource(nudged, nudge_source, initial_sync); |
-} |
- |
-void SyncerThreadPthreadImpl::SetUpdatesSource(bool nudged, |
- SyncerThread::NudgeSource nudge_source, bool* initial_sync) { |
- sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source = |
- sync_pb::GetUpdatesCallerInfo::UNKNOWN; |
- if (*initial_sync) { |
- updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE; |
- *initial_sync = false; |
- } else if (!nudged) { |
- updates_source = sync_pb::GetUpdatesCallerInfo::PERIODIC; |
- } else { |
- switch (nudge_source) { |
- case SyncerThread::kNotification: |
- updates_source = sync_pb::GetUpdatesCallerInfo::NOTIFICATION; |
- break; |
- case SyncerThread::kLocal: |
- updates_source = sync_pb::GetUpdatesCallerInfo::LOCAL; |
- break; |
- case SyncerThread::kContinuation: |
- updates_source = sync_pb::GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION; |
- break; |
- case SyncerThread::kUnknown: |
- default: |
- updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN; |
- break; |
- } |
- } |
- syncer_->set_updates_source(updates_source); |
-} |
- |
-void SyncerThreadPthreadImpl::HandleSyncerEvent(const SyncerEvent& event) { |
- MutexLock lock(&mutex_); |
- channel()->NotifyListeners(event); |
- if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) { |
- return; |
- } |
- NudgeSyncImpl(event.nudge_delay_milliseconds, SyncerThread::kUnknown); |
-} |
- |
-void SyncerThreadPthreadImpl::HandleDirectoryManagerEvent( |
- const syncable::DirectoryManagerEvent& event) { |
- LOG(INFO) << "Handling a directory manager event"; |
- if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) { |
- MutexLock lock(&mutex_); |
- LOG(INFO) << "Syncer starting up for: " << event.dirname; |
- // The underlying database structure is ready, and we should create |
- // the syncer. |
- CHECK(syncer_ == NULL); |
- syncer_ = |
- new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get()); |
- |
- syncer_->set_command_channel(command_channel_); |
- syncer_events_.reset(NewEventListenerHookup( |
- syncer_->channel(), this, &SyncerThreadPthreadImpl::HandleSyncerEvent)); |
- pthread_cond_broadcast(&changed_.condvar_); |
- } |
-} |
- |
-static inline void CheckConnected(bool* connected, |
- HttpResponse::ServerConnectionCode code, |
- pthread_cond_t* condvar) { |
- if (*connected) { |
- if (HttpResponse::CONNECTION_UNAVAILABLE == code) { |
- *connected = false; |
- pthread_cond_broadcast(condvar); |
- } |
- } else { |
- if (HttpResponse::SERVER_CONNECTION_OK == code) { |
- *connected = true; |
- pthread_cond_broadcast(condvar); |
- } |
- } |
-} |
- |
-void SyncerThreadPthreadImpl::WatchConnectionManager( |
- ServerConnectionManager* conn_mgr) { |
- conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this, |
- &SyncerThreadPthreadImpl::HandleServerConnectionEvent)); |
- CheckConnected(&connected_, conn_mgr->server_status(), |
- &changed_.condvar_); |
-} |
- |
-void SyncerThreadPthreadImpl::HandleServerConnectionEvent( |
- const ServerConnectionEvent& event) { |
- if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) { |
- MutexLock lock(&mutex_); |
- CheckConnected(&connected_, event.connection_code, |
- &changed_.condvar_); |
- } |
-} |
- |
-SyncerEventChannel* SyncerThreadPthreadImpl::channel() { |
- return syncer_event_channel_.get(); |
-} |
- |
-// Inputs and return value in milliseconds. |
-int SyncerThreadPthreadImpl::CalculateSyncWaitTime(int last_interval, |
- int user_idle_ms) { |
- // syncer_polling_interval_ is in seconds |
- int syncer_polling_interval_ms = syncer_polling_interval_ * 1000; |
- |
- // This is our default and lower bound. |
- int next_wait = syncer_polling_interval_ms; |
- |
- // Get idle time, bounded by max wait. |
- int idle = min(user_idle_ms, syncer_max_interval_); |
- |
- // If the user has been idle for a while, we'll start decreasing the poll |
- // rate. |
- if (idle >= kPollBackoffThresholdMultiplier * syncer_polling_interval_ms) { |
- next_wait = std::min(AllStatus::GetRecommendedDelaySeconds( |
- last_interval / 1000), syncer_max_interval_ / 1000) * 1000; |
- } |
- |
- return next_wait; |
-} |
- |
-// Called with mutex_ already locked. |
-void SyncerThreadPthreadImpl::NudgeSyncImpl(int milliseconds_from_now, |
- SyncerThread::NudgeSource source) { |
- const timespec nudge_time = GetPThreadAbsoluteTime(milliseconds_from_now); |
- NudgeObject nudge_object(nudge_time, source); |
- nudge_queue_.push(nudge_object); |
- pthread_cond_broadcast(&changed_.condvar_); |
-} |
- |
-void SyncerThreadPthreadImpl::WatchTalkMediator(TalkMediator* mediator) { |
- talk_mediator_hookup_.reset( |
- NewEventListenerHookup( |
- mediator->channel(), |
- this, |
- &SyncerThreadPthreadImpl::HandleTalkMediatorEvent)); |
-} |
- |
-void SyncerThreadPthreadImpl::HandleTalkMediatorEvent( |
- const TalkMediatorEvent& event) { |
- MutexLock lock(&mutex_); |
- switch (event.what_happened) { |
- case TalkMediatorEvent::LOGIN_SUCCEEDED: |
- LOG(INFO) << "P2P: Login succeeded."; |
- p2p_authenticated_ = true; |
- break; |
- case TalkMediatorEvent::LOGOUT_SUCCEEDED: |
- LOG(INFO) << "P2P: Login succeeded."; |
- p2p_authenticated_ = false; |
- break; |
- case TalkMediatorEvent::SUBSCRIPTIONS_ON: |
- LOG(INFO) << "P2P: Subscriptions successfully enabled."; |
- p2p_subscribed_ = true; |
- if (NULL != syncer_) { |
- LOG(INFO) << "Subscriptions on. Nudging syncer for initial push."; |
- NudgeSyncImpl(0, SyncerThread::kLocal); |
- } |
- break; |
- case TalkMediatorEvent::SUBSCRIPTIONS_OFF: |
- LOG(INFO) << "P2P: Subscriptions are not enabled."; |
- p2p_subscribed_ = false; |
- break; |
- case TalkMediatorEvent::NOTIFICATION_RECEIVED: |
- LOG(INFO) << "P2P: Updates on server, pushing syncer"; |
- if (NULL != syncer_) { |
- NudgeSyncImpl(0, SyncerThread::kNotification); |
- } |
- break; |
- default: |
- break; |
- } |
- |
- if (NULL != syncer_) { |
- syncer_->set_notifications_enabled(p2p_authenticated_ && p2p_subscribed_); |
- } |
-} |
- |
-} // namespace browser_sync |