Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(268)

Unified Diff: chrome/browser/sync/engine/syncer_thread_timed_stop.cc

Issue 214033: Use chrome/base synchronization primitives and threads instead of... (Closed) Base URL: svn://chrome-svn/chrome/trunk/src/
Patch Set: '' Created 11 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: chrome/browser/sync/engine/syncer_thread_timed_stop.cc
===================================================================
--- chrome/browser/sync/engine/syncer_thread_timed_stop.cc (revision 26372)
+++ chrome/browser/sync/engine/syncer_thread_timed_stop.cc (working copy)
@@ -1,9 +1,8 @@
// Copyright (c) 2009 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
+#include "chrome/browser/sync/engine/syncer_thread_timed_stop.h"
-#include "chrome/browser/sync/engine/syncer_thread.h"
-
#include "build/build_config.h"
#ifdef OS_MACOSX
@@ -26,534 +25,95 @@
using std::priority_queue;
using std::min;
+using base::Time;
+using base::TimeDelta;
+using base::TimeTicks;
-static inline bool operator < (const timespec& a, const timespec& b) {
- return a.tv_sec == b.tv_sec ? a.tv_nsec < b.tv_nsec : a.tv_sec < b.tv_sec;
-}
-
-namespace {
-
-// Returns the amount of time since the user last interacted with the computer,
-// in milliseconds
-int UserIdleTime() {
-#ifdef OS_WIN
- LASTINPUTINFO last_input_info;
- last_input_info.cbSize = sizeof(LASTINPUTINFO);
-
- // Get time in windows ticks since system start of last activity.
- BOOL b = ::GetLastInputInfo(&last_input_info);
- if (b == TRUE)
- return ::GetTickCount() - last_input_info.dwTime;
-#elif defined(OS_MACOSX)
- // It would be great to do something like:
- //
- // return 1000 *
- // CGEventSourceSecondsSinceLastEventType(
- // kCGEventSourceStateCombinedSessionState,
- // kCGAnyInputEventType);
- //
- // Unfortunately, CGEvent* lives in ApplicationServices, and we're a daemon
- // and can't link that high up the food chain. Thus this mucking in IOKit.
-
- io_service_t hid_service =
- IOServiceGetMatchingService(kIOMasterPortDefault,
- IOServiceMatching("IOHIDSystem"));
- if (!hid_service) {
- LOG(WARNING) << "Could not obtain IOHIDSystem";
- return 0;
- }
-
- CFTypeRef object = IORegistryEntryCreateCFProperty(hid_service,
- CFSTR("HIDIdleTime"),
- kCFAllocatorDefault,
- 0);
- if (!object) {
- LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property";
- IOObjectRelease(hid_service);
- return 0;
- }
-
- int64 idle_time; // in nanoseconds
- Boolean success;
- if (CFGetTypeID(object) == CFNumberGetTypeID()) {
- success = CFNumberGetValue((CFNumberRef)object,
- kCFNumberSInt64Type,
- &idle_time);
- } else {
- LOG(WARNING) << "IOHIDSystem's HIDIdleTime property isn't a number!";
- }
-
- CFRelease(object);
- IOObjectRelease(hid_service);
-
- if (!success) {
- LOG(WARNING) << "Could not get IOHIDSystem's HIDIdleTime property's value";
- return 0;
- } else {
- return idle_time / 1000000; // nano to milli
- }
-#else
- static bool was_logged = false;
- if (!was_logged) {
- was_logged = true;
- LOG(INFO) << "UserIdleTime unimplemented on this platform, "
- "synchronization will not throttle when user idle";
- }
-#endif
-
- return 0;
-}
-
-} // namespace
-
namespace browser_sync {
-bool SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) {
- MutexLock lock(&mutex_);
- if (syncer_ == NULL) {
- return false;
- }
- NudgeSyncImpl(milliseconds_from_now, source);
- return true;
-}
-
-void* RunSyncerThread(void* syncer_thread) {
- return (reinterpret_cast<SyncerThread*>(syncer_thread))->ThreadMain();
-}
-
-SyncerThread::SyncerThread(
+SyncerThreadTimedStop::SyncerThreadTimedStop(
ClientCommandChannel* command_channel,
syncable::DirectoryManager* mgr,
ServerConnectionManager* connection_manager,
AllStatus* all_status,
ModelSafeWorker* model_safe_worker)
- : dirman_(mgr), scm_(connection_manager),
- syncer_(NULL), syncer_events_(NULL), thread_running_(false),
- syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds),
- syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds),
- syncer_polling_interval_(kDefaultShortPollIntervalSeconds),
- syncer_max_interval_(kDefaultMaxPollIntervalMs),
- stop_syncer_thread_(false), connected_(false), conn_mgr_hookup_(NULL),
- p2p_authenticated_(false), p2p_subscribed_(false),
- allstatus_(all_status), talk_mediator_hookup_(NULL),
- command_channel_(command_channel), directory_manager_hookup_(NULL),
- model_safe_worker_(model_safe_worker),
- client_command_hookup_(NULL), disable_idle_detection_(false) {
-
- SyncerEvent shutdown = { SyncerEvent::SHUTDOWN_USE_WITH_CARE };
- syncer_event_channel_.reset(new SyncerEventChannel(shutdown));
-
- if (dirman_) {
- directory_manager_hookup_.reset(NewEventListenerHookup(
- dirman_->channel(), this, &SyncerThread::HandleDirectoryManagerEvent));
- }
-
- if (scm_) {
- WatchConnectionManager(scm_);
- }
-
- if (command_channel_) {
- WatchClientCommands(command_channel_);
- }
+ : SyncerThread(command_channel, mgr, connection_manager, all_status,
+ model_safe_worker),
+ in_thread_main_loop_(false) {
}
-SyncerThread::~SyncerThread() {
- client_command_hookup_.reset();
- conn_mgr_hookup_.reset();
- syncer_event_channel_.reset();
- directory_manager_hookup_.reset();
- syncer_events_.reset();
- delete syncer_;
- talk_mediator_hookup_.reset();
- CHECK(!thread_running_);
-}
-
-// Creates and starts a syncer thread.
-// Returns true if it creates a thread or if there's currently a thread running
-// and false otherwise.
-bool SyncerThread::Start() {
- MutexLock lock(&mutex_);
- if (thread_running_) {
- return true;
- }
- thread_running_ =
- (0 == pthread_create(&thread_, NULL, RunSyncerThread, this));
- if (thread_running_) {
- pthread_detach(thread_);
- }
- return thread_running_;
-}
-
// Stop processing. A max wait of at least 2*server RTT time is recommended.
// Returns true if we stopped, false otherwise.
-bool SyncerThread::Stop(int max_wait) {
- MutexLock lock(&mutex_);
- if (!thread_running_)
+bool SyncerThreadTimedStop::Stop(int max_wait) {
+ AutoLock lock(lock_);
+ // If the thread has been started, then we either already have or are about to
+ // enter ThreadMainLoop so we have to proceed with shutdown and wait for it to
+ // finish. If the thread has not been started --and we now own the lock--
+ // then we can early out because the caller has not called Start().
+ if (!thread_.IsRunning())
return true;
- stop_syncer_thread_ = true;
- if (NULL != syncer_) {
+
+ LOG(INFO) << "SyncerThread::Stop - setting ThreadMain exit condition to "
+ << "true (vault_.stop_syncer_thread_)";
+ // Exit the ThreadMainLoop once the syncer finishes (we tell it to exit
+ // below).
+ vault_.stop_syncer_thread_ = true;
+ if (NULL != vault_.syncer_) {
// Try to early exit the syncer.
- syncer_->RequestEarlyExit();
+ vault_.syncer_->RequestEarlyExit();
}
- pthread_cond_broadcast(&changed_.condvar_);
- timespec deadline = { time(NULL) + (max_wait / 1000), 0 };
- do {
- const int wait_result = max_wait < 0 ?
- pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_) :
- pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_,
- &deadline);
- if (ETIMEDOUT == wait_result) {
- LOG(ERROR) << "SyncerThread::Stop timed out. Problems likely.";
- return false;
- }
- } while (thread_running_);
- return true;
-}
-void SyncerThread::WatchClientCommands(ClientCommandChannel* channel) {
- PThreadScopedLock<PThreadMutex> lock(&mutex_);
- client_command_hookup_.reset(NewEventListenerHookup(channel, this,
- &SyncerThread::HandleClientCommand));
-}
-
-void SyncerThread::HandleClientCommand(ClientCommandChannel::EventType event) {
- if (!event) {
- return;
- }
-
- // Mutex not really necessary for these.
- if (event->has_set_sync_poll_interval()) {
- syncer_short_poll_interval_seconds_ = event->set_sync_poll_interval();
- }
-
- if (event->has_set_sync_long_poll_interval()) {
- syncer_long_poll_interval_seconds_ = event->set_sync_long_poll_interval();
- }
-}
-
-void SyncerThread::ThreadMainLoop() {
- // Use the short poll value by default.
- int poll_seconds = syncer_short_poll_interval_seconds_;
- int user_idle_milliseconds = 0;
- timespec last_sync_time = { 0 };
- bool initial_sync_for_thread = true;
- bool continue_sync_cycle = false;
-
- while (!stop_syncer_thread_) {
- if (!connected_) {
- LOG(INFO) << "Syncer thread waiting for connection.";
- while (!connected_ && !stop_syncer_thread_)
- pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_);
- LOG_IF(INFO, connected_) << "Syncer thread found connection.";
- continue;
+ // stop_syncer_thread_ is now true and the Syncer has been told to exit.
+ // We want to wake up all waiters so they can re-examine state. We signal,
+ // causing all waiters to try to re-acquire the lock, and then we atomically
+ // release the lock and wait. Our wait can be spuriously signaled, so we
+ // recalculate the remaining sleep time each time through and re-
+ // check the condition before exiting the loop.
+ vault_field_changed_.Broadcast();
+ TimeTicks start = TimeTicks::Now();
+ TimeTicks end = start + TimeDelta::FromMilliseconds(max_wait);
+ bool timed_out = false;
+ // Eventually the combination of RequestEarlyExit and setting
+ // stop_syncer_thread_ to true above will cause in_thread_main_loop_ to become
+ // false.
+ while (in_thread_main_loop_) {
+ TimeDelta sleep_time = end - TimeTicks::Now();
+ if (sleep_time < TimeDelta::FromSeconds(0)) {
+ timed_out = true;
+ break;
}
-
- if (syncer_ == NULL) {
- LOG(INFO) << "Syncer thread waiting for database initialization.";
- while (syncer_ == NULL && !stop_syncer_thread_)
- pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_);
- LOG_IF(INFO, !(syncer_ == NULL)) << "Syncer was found after DB started.";
- continue;
- }
-
- timespec const next_poll = { last_sync_time.tv_sec + poll_seconds,
- last_sync_time.tv_nsec };
- const timespec wake_time =
- !nudge_queue_.empty() && nudge_queue_.top().first < next_poll ?
- nudge_queue_.top().first : next_poll;
- LOG(INFO) << "wake time is " << wake_time.tv_sec;
- LOG(INFO) << "next poll is " << next_poll.tv_sec;
-
- const int error = pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_,
- &wake_time);
- if (ETIMEDOUT != error) {
- continue; // Check all the conditions again.
- }
-
- const timespec now = GetPThreadAbsoluteTime(0);
-
- // Handle a nudge, caused by either a notification or a local bookmark
- // event. This will also update the source of the following SyncMain call.
- UpdateNudgeSource(now, &continue_sync_cycle, &initial_sync_for_thread);
-
- LOG(INFO) << "Calling Sync Main at time " << now.tv_sec;
- SyncMain(syncer_);
- last_sync_time = now;
-
- LOG(INFO) << "Updating the next polling time after SyncMain";
- poll_seconds = CalculatePollingWaitTime(allstatus_->status(),
- poll_seconds,
- &user_idle_milliseconds,
- &continue_sync_cycle);
+ LOG(INFO) << "Waiting in stop for " << sleep_time.InSeconds() << "s.";
+ vault_field_changed_.TimedWait(sleep_time);
}
-}
-// We check how long the user's been idle and sync less often if the machine is
-// not in use. The aim is to reduce server load.
-int SyncerThread::CalculatePollingWaitTime(
- const AllStatus::Status& status,
- int last_poll_wait, // in s
- int* user_idle_milliseconds,
- bool* continue_sync_cycle) {
- bool is_continuing_sync_cyle = *continue_sync_cycle;
- *continue_sync_cycle = false;
-
- // Determine if the syncer has unfinished work to do from allstatus_.
- const bool syncer_has_work_to_do =
- status.updates_available > status.updates_received
- || status.unsynced_count > 0;
- LOG(INFO) << "syncer_has_work_to_do is " << syncer_has_work_to_do;
-
- // First calculate the expected wait time, figuring in any backoff because of
- // user idle time. next_wait is in seconds
- syncer_polling_interval_ = (!status.notifications_enabled) ?
- syncer_short_poll_interval_seconds_ :
- syncer_long_poll_interval_seconds_;
- int default_next_wait = syncer_polling_interval_;
- int actual_next_wait = default_next_wait;
-
- if (syncer_has_work_to_do) {
- // Provide exponential backoff due to consecutive errors, else attempt to
- // complete the work as soon as possible.
- if (!is_continuing_sync_cyle) {
- actual_next_wait = AllStatus::GetRecommendedDelaySeconds(0);
- } else {
- actual_next_wait = AllStatus::GetRecommendedDelaySeconds(last_poll_wait);
- }
- *continue_sync_cycle = true;
- } else if (!status.notifications_enabled) {
- // Ensure that we start exponential backoff from our base polling
- // interval when we are not continuing a sync cycle.
- last_poll_wait = std::max(last_poll_wait, syncer_polling_interval_);
-
- // Did the user start interacting with the computer again?
- // If so, revise our idle time (and probably next_sync_time) downwards
- int new_idle_time = disable_idle_detection_ ? 0 : UserIdleTime();
- if (new_idle_time < *user_idle_milliseconds) {
- *user_idle_milliseconds = new_idle_time;
- }
- actual_next_wait = CalculateSyncWaitTime(last_poll_wait * 1000,
- *user_idle_milliseconds) / 1000;
- DCHECK_GE(actual_next_wait, default_next_wait);
+ if (timed_out) {
+ LOG(ERROR) << "SyncerThread::Stop timed out or error. Problems likely.";
+ return false;
}
- LOG(INFO) << "Sync wait: idle " << default_next_wait
- << " non-idle or backoff " << actual_next_wait << ".";
-
- return actual_next_wait;
+ // Stop() should not block on anything at this point, given above madness.
+ DLOG(INFO) << "Calling SyncerThread::thread_.Stop() at "
+ << Time::Now().ToInternalValue();
+ thread_.Stop();
+ DLOG(INFO) << "SyncerThread::thread_.Stop() finished at "
+ << Time::Now().ToInternalValue();
+ return true;
}
-void* SyncerThread::ThreadMain() {
- NameCurrentThreadForDebugging("SyncEngine_SyncerThread");
- mutex_.Lock();
+void SyncerThreadTimedStop::ThreadMain() {
+ AutoLock lock(lock_);
+ // Signal Start() to let it know we've made it safely are now running on the
+ // message loop, and unblock it's caller.
+ thread_main_started_.Signal();
+
+ // The only thing that could be waiting on this value is Stop, and we don't
+ // release the lock until we're far enough along to Stop safely.
+ in_thread_main_loop_ = true;
+ vault_field_changed_.Broadcast();
ThreadMainLoop();
- thread_running_ = false;
- pthread_cond_broadcast(&changed_.condvar_);
- mutex_.Unlock();
- LOG(INFO) << "Syncer thread exiting.";
- return 0;
+ in_thread_main_loop_ = false;
+ vault_field_changed_.Broadcast();
+ LOG(INFO) << "Syncer thread ThreadMain is done.";
}
-void SyncerThread::SyncMain(Syncer* syncer) {
- CHECK(syncer);
- mutex_.Unlock();
- while (syncer->SyncShare()) {
- LOG(INFO) << "Looping in sync share";
- }
- LOG(INFO) << "Done looping in sync share";
-
- mutex_.Lock();
-}
-
-void SyncerThread::UpdateNudgeSource(const timespec& now,
- bool* continue_sync_cycle,
- bool* initial_sync) {
- bool nudged = false;
- NudgeSource nudge_source = kUnknown;
- // Has the previous sync cycle completed?
- if (continue_sync_cycle) {
- nudge_source = kContinuation;
- }
- // Update the nudge source if a new nudge has come through during the
- // previous sync cycle.
- while (!nudge_queue_.empty() && !(now < nudge_queue_.top().first)) {
- if (!nudged) {
- nudge_source = nudge_queue_.top().second;
- *continue_sync_cycle = false; // Reset the continuation token on nudge.
- nudged = true;
- }
- nudge_queue_.pop();
- }
- SetUpdatesSource(nudged, nudge_source, initial_sync);
-}
-
-void SyncerThread::SetUpdatesSource(bool nudged, NudgeSource nudge_source,
- bool* initial_sync) {
- sync_pb::GetUpdatesCallerInfo::GET_UPDATES_SOURCE updates_source =
- sync_pb::GetUpdatesCallerInfo::UNKNOWN;
- if (*initial_sync) {
- updates_source = sync_pb::GetUpdatesCallerInfo::FIRST_UPDATE;
- *initial_sync = false;
- } else if (!nudged) {
- updates_source = sync_pb::GetUpdatesCallerInfo::PERIODIC;
- } else {
- switch (nudge_source) {
- case kNotification:
- updates_source = sync_pb::GetUpdatesCallerInfo::NOTIFICATION;
- break;
- case kLocal:
- updates_source = sync_pb::GetUpdatesCallerInfo::LOCAL;
- break;
- case kContinuation:
- updates_source = sync_pb::GetUpdatesCallerInfo::SYNC_CYCLE_CONTINUATION;
- break;
- case kUnknown:
- default:
- updates_source = sync_pb::GetUpdatesCallerInfo::UNKNOWN;
- break;
- }
- }
- syncer_->set_updates_source(updates_source);
-}
-
-void SyncerThread::HandleSyncerEvent(const SyncerEvent& event) {
- MutexLock lock(&mutex_);
- channel()->NotifyListeners(event);
- if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) {
- return;
- }
- NudgeSyncImpl(event.nudge_delay_milliseconds, kUnknown);
-}
-
-void SyncerThread::HandleDirectoryManagerEvent(
- const syncable::DirectoryManagerEvent& event) {
- LOG(INFO) << "Handling a directory manager event";
- if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) {
- MutexLock lock(&mutex_);
- LOG(INFO) << "Syncer starting up for: " << event.dirname;
- // The underlying database structure is ready, and we should create
- // the syncer.
- CHECK(syncer_ == NULL);
- syncer_ =
- new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get());
-
- syncer_->set_command_channel(command_channel_);
- syncer_events_.reset(NewEventListenerHookup(
- syncer_->channel(), this, &SyncerThread::HandleSyncerEvent));
- pthread_cond_broadcast(&changed_.condvar_);
- }
-}
-
-static inline void CheckConnected(bool* connected,
- HttpResponse::ServerConnectionCode code,
- pthread_cond_t* condvar) {
- if (*connected) {
- if (HttpResponse::CONNECTION_UNAVAILABLE == code) {
- *connected = false;
- pthread_cond_broadcast(condvar);
- }
- } else {
- if (HttpResponse::SERVER_CONNECTION_OK == code) {
- *connected = true;
- pthread_cond_broadcast(condvar);
- }
- }
-}
-
-void SyncerThread::WatchConnectionManager(ServerConnectionManager* conn_mgr) {
- conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this,
- &SyncerThread::HandleServerConnectionEvent));
- CheckConnected(&connected_, conn_mgr->server_status(),
- &changed_.condvar_);
-}
-
-void SyncerThread::HandleServerConnectionEvent(
- const ServerConnectionEvent& event) {
- if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) {
- MutexLock lock(&mutex_);
- CheckConnected(&connected_, event.connection_code,
- &changed_.condvar_);
- }
-}
-
-SyncerEventChannel* SyncerThread::channel() {
- return syncer_event_channel_.get();
-}
-
-// Inputs and return value in milliseconds.
-int SyncerThread::CalculateSyncWaitTime(int last_interval, int user_idle_ms) {
- // syncer_polling_interval_ is in seconds
- int syncer_polling_interval_ms = syncer_polling_interval_ * 1000;
-
- // This is our default and lower bound.
- int next_wait = syncer_polling_interval_ms;
-
- // Get idle time, bounded by max wait.
- int idle = min(user_idle_ms, syncer_max_interval_);
-
- // If the user has been idle for a while, we'll start decreasing the poll
- // rate.
- if (idle >= kPollBackoffThresholdMultiplier * syncer_polling_interval_ms) {
- next_wait = std::min(AllStatus::GetRecommendedDelaySeconds(
- last_interval / 1000), syncer_max_interval_ / 1000) * 1000;
- }
-
- return next_wait;
-}
-
-// Called with mutex_ already locked.
-void SyncerThread::NudgeSyncImpl(int milliseconds_from_now,
- NudgeSource source) {
- const timespec nudge_time = GetPThreadAbsoluteTime(milliseconds_from_now);
- NudgeObject nudge_object(nudge_time, source);
- nudge_queue_.push(nudge_object);
- pthread_cond_broadcast(&changed_.condvar_);
-}
-
-void SyncerThread::WatchTalkMediator(TalkMediator* mediator) {
- talk_mediator_hookup_.reset(
- NewEventListenerHookup(
- mediator->channel(),
- this,
- &SyncerThread::HandleTalkMediatorEvent));
-}
-
-void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) {
- MutexLock lock(&mutex_);
- switch (event.what_happened) {
- case TalkMediatorEvent::LOGIN_SUCCEEDED:
- LOG(INFO) << "P2P: Login succeeded.";
- p2p_authenticated_ = true;
- break;
- case TalkMediatorEvent::LOGOUT_SUCCEEDED:
- LOG(INFO) << "P2P: Login succeeded.";
- p2p_authenticated_ = false;
- break;
- case TalkMediatorEvent::SUBSCRIPTIONS_ON:
- LOG(INFO) << "P2P: Subscriptions successfully enabled.";
- p2p_subscribed_ = true;
- if (NULL != syncer_) {
- LOG(INFO) << "Subscriptions on. Nudging syncer for initial push.";
- NudgeSyncImpl(0, kLocal);
- }
- break;
- case TalkMediatorEvent::SUBSCRIPTIONS_OFF:
- LOG(INFO) << "P2P: Subscriptions are not enabled.";
- p2p_subscribed_ = false;
- break;
- case TalkMediatorEvent::NOTIFICATION_RECEIVED:
- LOG(INFO) << "P2P: Updates on server, pushing syncer";
- if (NULL != syncer_) {
- NudgeSyncImpl(0, kNotification);
- }
- break;
- default:
- break;
- }
-
- if (NULL != syncer_) {
- syncer_->set_notifications_enabled(p2p_authenticated_ && p2p_subscribed_);
- }
-}
-
-} // namespace browser_sync
+} // namespace browser_sync
« no previous file with comments | « chrome/browser/sync/engine/syncer_thread_timed_stop.h ('k') | chrome/browser/sync/engine/syncer_thread_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698