Index: chrome/browser/sync/engine/syncer_thread.cc |
=================================================================== |
--- chrome/browser/sync/engine/syncer_thread.cc (revision 27084) |
+++ chrome/browser/sync/engine/syncer_thread.cc (working copy) |
@@ -1,7 +1,6 @@ |
// Copyright (c) 2009 The Chromium Authors. All rights reserved. |
// Use of this source code is governed by a BSD-style license that can be |
// found in the LICENSE file. |
- |
#include "chrome/browser/sync/engine/syncer_thread.h" |
#include "build/build_config.h" |
@@ -16,21 +15,24 @@ |
#include <map> |
#include <queue> |
+#include "base/command_line.h" |
#include "chrome/browser/sync/engine/auth_watcher.h" |
#include "chrome/browser/sync/engine/model_safe_worker.h" |
#include "chrome/browser/sync/engine/net/server_connection_manager.h" |
#include "chrome/browser/sync/engine/syncer.h" |
+#include "chrome/browser/sync/engine/syncer_thread_pthreads.h" |
+#include "chrome/browser/sync/engine/syncer_thread_timed_stop.h" |
#include "chrome/browser/sync/notifier/listener/talk_mediator.h" |
#include "chrome/browser/sync/notifier/listener/talk_mediator_impl.h" |
#include "chrome/browser/sync/syncable/directory_manager.h" |
+#include "chrome/common/chrome_switches.h" |
using std::priority_queue; |
using std::min; |
+using base::Time; |
+using base::TimeDelta; |
+using base::TimeTicks; |
-static inline bool operator < (const timespec& a, const timespec& b) { |
- return a.tv_sec == b.tv_sec ? a.tv_nsec < b.tv_nsec : a.tv_sec < b.tv_sec; |
-} |
- |
namespace { |
// Returns the amount of time since the user last interacted with the computer, |
@@ -108,17 +110,56 @@ |
namespace browser_sync { |
+SyncerThread* SyncerThreadFactory::Create( |
+ ClientCommandChannel* command_channel, |
+ syncable::DirectoryManager* mgr, |
+ ServerConnectionManager* connection_manager, AllStatus* all_status, |
+ ModelSafeWorker* model_safe_worker) { |
+ const CommandLine* cmd = CommandLine::ForCurrentProcess(); |
+ if (cmd->HasSwitch(switches::kSyncerThreadTimedStop)) { |
+ return new SyncerThreadTimedStop(command_channel, mgr, connection_manager, |
+ all_status, model_safe_worker); |
+ } else if (cmd->HasSwitch(switches::kSyncerThreadPthreads)) { |
+ return new SyncerThreadPthreads(command_channel, mgr, connection_manager, |
+ all_status, model_safe_worker); |
+ } else { |
+ // The default SyncerThread implementation, which does not time-out when |
+ // Stop is called. |
+ return new SyncerThread(command_channel, mgr, connection_manager, |
+ all_status, model_safe_worker); |
+ } |
+} |
+ |
bool SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) { |
- MutexLock lock(&mutex_); |
- if (syncer_ == NULL) { |
+ AutoLock lock(lock_); |
+ if (vault_.syncer_ == NULL) { |
return false; |
} |
NudgeSyncImpl(milliseconds_from_now, source); |
return true; |
} |
-void* RunSyncerThread(void* syncer_thread) { |
- return (reinterpret_cast<SyncerThread*>(syncer_thread))->ThreadMain(); |
+SyncerThread::SyncerThread() |
+ : thread_main_started_(false, false), |
+ thread_("SyncEngine_SyncerThread"), |
+ vault_field_changed_(&lock_), |
+ p2p_authenticated_(false), |
+ p2p_subscribed_(false), |
+ client_command_hookup_(NULL), |
+ conn_mgr_hookup_(NULL), |
+ allstatus_(NULL), |
+ dirman_(NULL), |
+ scm_(NULL), |
+ syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds), |
+ syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds), |
+ syncer_polling_interval_(kDefaultShortPollIntervalSeconds), |
+ syncer_max_interval_(kDefaultMaxPollIntervalMs), |
+ talk_mediator_hookup_(NULL), |
+ command_channel_(NULL), |
+ directory_manager_hookup_(NULL), |
+ syncer_events_(NULL), |
+ model_safe_worker_(NULL), |
+ disable_idle_detection_(false) { |
} |
SyncerThread::SyncerThread( |
@@ -127,18 +168,26 @@ |
ServerConnectionManager* connection_manager, |
AllStatus* all_status, |
ModelSafeWorker* model_safe_worker) |
- : dirman_(mgr), scm_(connection_manager), |
- syncer_(NULL), syncer_events_(NULL), thread_running_(false), |
+ : thread_main_started_(false, false), |
+ thread_("SyncEngine_SyncerThread"), |
+ vault_field_changed_(&lock_), |
+ p2p_authenticated_(false), |
+ p2p_subscribed_(false), |
+ client_command_hookup_(NULL), |
+ conn_mgr_hookup_(NULL), |
+ allstatus_(all_status), |
+ dirman_(mgr), |
+ scm_(connection_manager), |
syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds), |
syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds), |
syncer_polling_interval_(kDefaultShortPollIntervalSeconds), |
syncer_max_interval_(kDefaultMaxPollIntervalMs), |
- stop_syncer_thread_(false), connected_(false), conn_mgr_hookup_(NULL), |
- p2p_authenticated_(false), p2p_subscribed_(false), |
- allstatus_(all_status), talk_mediator_hookup_(NULL), |
- command_channel_(command_channel), directory_manager_hookup_(NULL), |
+ talk_mediator_hookup_(NULL), |
+ command_channel_(command_channel), |
+ directory_manager_hookup_(NULL), |
+ syncer_events_(NULL), |
model_safe_worker_(model_safe_worker), |
- client_command_hookup_(NULL), disable_idle_detection_(false) { |
+ disable_idle_detection_(false) { |
SyncerEvent shutdown = { SyncerEvent::SHUTDOWN_USE_WITH_CARE }; |
syncer_event_channel_.reset(new SyncerEventChannel(shutdown)); |
@@ -163,55 +212,76 @@ |
syncer_event_channel_.reset(); |
directory_manager_hookup_.reset(); |
syncer_events_.reset(); |
- delete syncer_; |
+ delete vault_.syncer_; |
talk_mediator_hookup_.reset(); |
- CHECK(!thread_running_); |
+ CHECK(!thread_.IsRunning()); |
} |
// Creates and starts a syncer thread. |
// Returns true if it creates a thread or if there's currently a thread running |
// and false otherwise. |
bool SyncerThread::Start() { |
- MutexLock lock(&mutex_); |
- if (thread_running_) { |
- return true; |
+ { |
+ AutoLock lock(lock_); |
+ if (thread_.IsRunning()) { |
+ return true; |
+ } |
+ |
+ if (!thread_.Start()) { |
+ return false; |
+ } |
} |
- thread_running_ = |
- (0 == pthread_create(&thread_, NULL, RunSyncerThread, this)); |
- if (thread_running_) { |
- pthread_detach(thread_); |
- } |
- return thread_running_; |
+ |
+ thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this, |
+ &SyncerThread::ThreadMain)); |
+ |
+ // Wait for notification that our task makes it safely onto the message |
+ // loop before returning, so the caller can't call Stop before we're |
+ // actually up and running. This is for consistency with the old pthread |
+ // impl because pthread_create would do this in one step. |
+ thread_main_started_.Wait(); |
+ LOG(INFO) << "SyncerThread started."; |
+ return true; |
} |
// Stop processing. A max wait of at least 2*server RTT time is recommended. |
// Returns true if we stopped, false otherwise. |
bool SyncerThread::Stop(int max_wait) { |
- MutexLock lock(&mutex_); |
- if (!thread_running_) |
- return true; |
- stop_syncer_thread_ = true; |
- if (NULL != syncer_) { |
- // Try to early exit the syncer. |
- syncer_->RequestEarlyExit(); |
+ { |
+ AutoLock lock(lock_); |
+ // If the thread has been started, then we either already have or are about |
+ // to enter ThreadMainLoop so we have to proceed with shutdown and wait for |
+ // it to finish. If the thread has not been started --and we now own the |
+ // lock-- then we can early out because the caller has not called Start(). |
+ if (!thread_.IsRunning()) |
+ return true; |
+ |
+ LOG(INFO) << "SyncerThread::Stop - setting ThreadMain exit condition to " |
+ << "true (vault_.stop_syncer_thread_)"; |
+ // Exit the ThreadMainLoop once the syncer finishes (we tell it to exit |
+ // below). |
+ vault_.stop_syncer_thread_ = true; |
+ if (NULL != vault_.syncer_) { |
+ // Try to early exit the syncer itself, which could be looping inside |
+ // SyncShare. |
+ vault_.syncer_->RequestEarlyExit(); |
+ } |
+ |
+ // stop_syncer_thread_ is now true and the Syncer has been told to exit. |
+ // We want to wake up all waiters so they can re-examine state. We signal, |
+ // causing all waiters to try to re-acquire the lock, and then we release |
+ // the lock, and join on our internal thread which should soon run off the |
+ // end of ThreadMain. |
+ vault_field_changed_.Broadcast(); |
} |
- pthread_cond_broadcast(&changed_.condvar_); |
- timespec deadline = { time(NULL) + (max_wait / 1000), 0 }; |
- do { |
- const int wait_result = max_wait < 0 ? |
- pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_) : |
- pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_, |
- &deadline); |
- if (ETIMEDOUT == wait_result) { |
- LOG(ERROR) << "SyncerThread::Stop timed out. Problems likely."; |
- return false; |
- } |
- } while (thread_running_); |
+ |
+ // This will join, and finish when ThreadMain terminates. |
+ thread_.Stop(); |
return true; |
} |
void SyncerThread::WatchClientCommands(ClientCommandChannel* channel) { |
- PThreadScopedLock<PThreadMutex> lock(&mutex_); |
+ AutoLock lock(lock_); |
client_command_hookup_.reset(NewEventListenerHookup(channel, this, |
&SyncerThread::HandleClientCommand)); |
} |
@@ -232,67 +302,89 @@ |
} |
void SyncerThread::ThreadMainLoop() { |
+ // This is called with lock_ acquired. |
+ lock_.AssertAcquired(); |
+ LOG(INFO) << "In thread main loop."; |
+ |
// Use the short poll value by default. |
- int poll_seconds = syncer_short_poll_interval_seconds_; |
+ TimeDelta poll_seconds = |
+ TimeDelta::FromSeconds(syncer_short_poll_interval_seconds_); |
int user_idle_milliseconds = 0; |
- timespec last_sync_time = { 0 }; |
+ TimeTicks last_sync_time; |
bool initial_sync_for_thread = true; |
bool continue_sync_cycle = false; |
- while (!stop_syncer_thread_) { |
- if (!connected_) { |
+ while (!vault_.stop_syncer_thread_) { |
+ // The Wait()s in these conditionals using |vault_| are not TimedWait()s (as |
+ // below) because we cannot poll until these conditions are met, so we wait |
+ // indefinitely. |
+ if (!vault_.connected_) { |
LOG(INFO) << "Syncer thread waiting for connection."; |
- while (!connected_ && !stop_syncer_thread_) |
- pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_); |
- LOG_IF(INFO, connected_) << "Syncer thread found connection."; |
+ while (!vault_.connected_ && !vault_.stop_syncer_thread_) |
+ vault_field_changed_.Wait(); |
+ LOG_IF(INFO, vault_.connected_) << "Syncer thread found connection."; |
continue; |
} |
- if (syncer_ == NULL) { |
+ if (vault_.syncer_ == NULL) { |
LOG(INFO) << "Syncer thread waiting for database initialization."; |
- while (syncer_ == NULL && !stop_syncer_thread_) |
- pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_); |
- LOG_IF(INFO, !(syncer_ == NULL)) << "Syncer was found after DB started."; |
+ while (vault_.syncer_ == NULL && !vault_.stop_syncer_thread_) |
+ vault_field_changed_.Wait(); |
+ LOG_IF(INFO, !(vault_.syncer_ == NULL)) |
+ << "Syncer was found after DB started."; |
continue; |
} |
- timespec const next_poll = { last_sync_time.tv_sec + poll_seconds, |
- last_sync_time.tv_nsec }; |
- const timespec wake_time = |
- !nudge_queue_.empty() && nudge_queue_.top().first < next_poll ? |
- nudge_queue_.top().first : next_poll; |
- LOG(INFO) << "wake time is " << wake_time.tv_sec; |
- LOG(INFO) << "next poll is " << next_poll.tv_sec; |
+ const TimeTicks next_poll = last_sync_time + poll_seconds; |
+ const TimeTicks end_wait = |
+ !vault_.nudge_queue_.empty() && |
+ vault_.nudge_queue_.top().first < next_poll ? |
+ vault_.nudge_queue_.top().first : next_poll; |
+ LOG(INFO) << "end_wait is " << end_wait.ToInternalValue(); |
+ LOG(INFO) << "next_poll is " << next_poll.ToInternalValue(); |
- const int error = pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_, |
- &wake_time); |
- if (ETIMEDOUT != error) { |
- continue; // Check all the conditions again. |
+ // We block until the CV is signaled (e.g a control field changed, loss of |
+ // network connection, nudge, spurious, etc), or the poll interval elapses. |
+ TimeDelta sleep_time = end_wait - TimeTicks::Now(); |
+ if (sleep_time > TimeDelta::FromSeconds(0)) { |
+ vault_field_changed_.TimedWait(sleep_time); |
+ |
+ if (TimeTicks::Now() < end_wait) { |
+ // Didn't timeout. Could be a spurious signal, or a signal corresponding |
+ // to an actual change in one of our control fields. By continuing here |
+ // we perform the typical "always recheck conditions when signaled", |
+ // (typically handled by a while(condition_not_met) cv.wait() construct) |
+ // because we jump to the top of the loop. The main difference is we |
+ // recalculate the wait interval, but last_sync_time won't have changed. |
+ // So if we were signaled by a nudge (for ex.) we'll grab the new nudge |
+ // off the queue and wait for that delta. If it was a spurious signal, |
+ // we'll keep waiting for the same moment in time as we just were. |
+ continue; |
+ } |
} |
- const timespec now = GetPThreadAbsoluteTime(0); |
- |
// Handle a nudge, caused by either a notification or a local bookmark |
// event. This will also update the source of the following SyncMain call. |
- UpdateNudgeSource(now, &continue_sync_cycle, &initial_sync_for_thread); |
+ UpdateNudgeSource(&continue_sync_cycle, &initial_sync_for_thread); |
- LOG(INFO) << "Calling Sync Main at time " << now.tv_sec; |
- SyncMain(syncer_); |
- last_sync_time = now; |
+ LOG(INFO) << "Calling Sync Main at time " << Time::Now().ToInternalValue(); |
+ SyncMain(vault_.syncer_); |
+ last_sync_time = TimeTicks::Now(); |
LOG(INFO) << "Updating the next polling time after SyncMain"; |
- poll_seconds = CalculatePollingWaitTime(allstatus_->status(), |
- poll_seconds, |
- &user_idle_milliseconds, |
- &continue_sync_cycle); |
+ poll_seconds = TimeDelta::FromSeconds(CalculatePollingWaitTime( |
+ allstatus_->status(), static_cast<int>(poll_seconds.InSeconds()), |
+ &user_idle_milliseconds, &continue_sync_cycle)); |
} |
+ |
} |
// We check how long the user's been idle and sync less often if the machine is |
// not in use. The aim is to reduce server load. |
+// TODO(timsteele): Should use Time(Delta). |
int SyncerThread::CalculatePollingWaitTime( |
const AllStatus::Status& status, |
- int last_poll_wait, // in s |
+ int last_poll_wait, // Time in seconds. |
int* user_idle_milliseconds, |
bool* continue_sync_cycle) { |
bool is_continuing_sync_cyle = *continue_sync_cycle; |
@@ -343,30 +435,25 @@ |
return actual_next_wait; |
} |
-void* SyncerThread::ThreadMain() { |
- NameCurrentThreadForDebugging("SyncEngine_SyncerThread"); |
- mutex_.Lock(); |
+void SyncerThread::ThreadMain() { |
+ AutoLock lock(lock_); |
+ // Signal Start() to let it know we've made it safely onto the message loop, |
+ // and unblock it's caller. |
+ thread_main_started_.Signal(); |
ThreadMainLoop(); |
- thread_running_ = false; |
- pthread_cond_broadcast(&changed_.condvar_); |
- mutex_.Unlock(); |
- LOG(INFO) << "Syncer thread exiting."; |
- return 0; |
+ LOG(INFO) << "Syncer thread ThreadMain is done."; |
} |
void SyncerThread::SyncMain(Syncer* syncer) { |
CHECK(syncer); |
- mutex_.Unlock(); |
+ AutoUnlock unlock(lock_); |
while (syncer->SyncShare()) { |
LOG(INFO) << "Looping in sync share"; |
} |
LOG(INFO) << "Done looping in sync share"; |
- |
- mutex_.Lock(); |
} |
-void SyncerThread::UpdateNudgeSource(const timespec& now, |
- bool* continue_sync_cycle, |
+void SyncerThread::UpdateNudgeSource(bool* continue_sync_cycle, |
bool* initial_sync) { |
bool nudged = false; |
NudgeSource nudge_source = kUnknown; |
@@ -376,13 +463,14 @@ |
} |
// Update the nudge source if a new nudge has come through during the |
// previous sync cycle. |
- while (!nudge_queue_.empty() && !(now < nudge_queue_.top().first)) { |
+ while (!vault_.nudge_queue_.empty() && |
+ TimeTicks::Now() >= vault_.nudge_queue_.top().first) { |
if (!nudged) { |
- nudge_source = nudge_queue_.top().second; |
+ nudge_source = vault_.nudge_queue_.top().second; |
*continue_sync_cycle = false; // Reset the continuation token on nudge. |
nudged = true; |
} |
- nudge_queue_.pop(); |
+ vault_.nudge_queue_.pop(); |
} |
SetUpdatesSource(nudged, nudge_source, initial_sync); |
} |
@@ -413,11 +501,11 @@ |
break; |
} |
} |
- syncer_->set_updates_source(updates_source); |
+ vault_.syncer_->set_updates_source(updates_source); |
} |
void SyncerThread::HandleSyncerEvent(const SyncerEvent& event) { |
- MutexLock lock(&mutex_); |
+ AutoLock lock(lock_); |
channel()->NotifyListeners(event); |
if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) { |
return; |
@@ -429,33 +517,33 @@ |
const syncable::DirectoryManagerEvent& event) { |
LOG(INFO) << "Handling a directory manager event"; |
if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) { |
- MutexLock lock(&mutex_); |
+ AutoLock lock(lock_); |
LOG(INFO) << "Syncer starting up for: " << event.dirname; |
// The underlying database structure is ready, and we should create |
// the syncer. |
- CHECK(syncer_ == NULL); |
- syncer_ = |
+ CHECK(vault_.syncer_ == NULL); |
+ vault_.syncer_ = |
new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get()); |
- syncer_->set_command_channel(command_channel_); |
+ vault_.syncer_->set_command_channel(command_channel_); |
syncer_events_.reset(NewEventListenerHookup( |
- syncer_->channel(), this, &SyncerThread::HandleSyncerEvent)); |
- pthread_cond_broadcast(&changed_.condvar_); |
+ vault_.syncer_->channel(), this, &SyncerThread::HandleSyncerEvent)); |
+ vault_field_changed_.Broadcast(); |
} |
} |
static inline void CheckConnected(bool* connected, |
HttpResponse::ServerConnectionCode code, |
- pthread_cond_t* condvar) { |
+ ConditionVariable* condvar) { |
if (*connected) { |
if (HttpResponse::CONNECTION_UNAVAILABLE == code) { |
*connected = false; |
- pthread_cond_broadcast(condvar); |
+ condvar->Broadcast(); |
} |
} else { |
if (HttpResponse::SERVER_CONNECTION_OK == code) { |
*connected = true; |
- pthread_cond_broadcast(condvar); |
+ condvar->Broadcast(); |
} |
} |
} |
@@ -463,16 +551,16 @@ |
void SyncerThread::WatchConnectionManager(ServerConnectionManager* conn_mgr) { |
conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this, |
&SyncerThread::HandleServerConnectionEvent)); |
- CheckConnected(&connected_, conn_mgr->server_status(), |
- &changed_.condvar_); |
+ CheckConnected(&vault_.connected_, conn_mgr->server_status(), |
+ &vault_field_changed_); |
} |
void SyncerThread::HandleServerConnectionEvent( |
const ServerConnectionEvent& event) { |
if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) { |
- MutexLock lock(&mutex_); |
- CheckConnected(&connected_, event.connection_code, |
- &changed_.condvar_); |
+ AutoLock lock(lock_); |
+ CheckConnected(&vault_.connected_, event.connection_code, |
+ &vault_field_changed_); |
} |
} |
@@ -504,10 +592,11 @@ |
// Called with mutex_ already locked. |
void SyncerThread::NudgeSyncImpl(int milliseconds_from_now, |
NudgeSource source) { |
- const timespec nudge_time = GetPThreadAbsoluteTime(milliseconds_from_now); |
+ const TimeTicks nudge_time = TimeTicks::Now() + |
+ TimeDelta::FromMilliseconds(milliseconds_from_now); |
NudgeObject nudge_object(nudge_time, source); |
- nudge_queue_.push(nudge_object); |
- pthread_cond_broadcast(&changed_.condvar_); |
+ vault_.nudge_queue_.push(nudge_object); |
+ vault_field_changed_.Broadcast(); |
} |
void SyncerThread::WatchTalkMediator(TalkMediator* mediator) { |
@@ -519,7 +608,7 @@ |
} |
void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) { |
- MutexLock lock(&mutex_); |
+ AutoLock lock(lock_); |
switch (event.what_happened) { |
case TalkMediatorEvent::LOGIN_SUCCEEDED: |
LOG(INFO) << "P2P: Login succeeded."; |
@@ -532,7 +621,7 @@ |
case TalkMediatorEvent::SUBSCRIPTIONS_ON: |
LOG(INFO) << "P2P: Subscriptions successfully enabled."; |
p2p_subscribed_ = true; |
- if (NULL != syncer_) { |
+ if (NULL != vault_.syncer_) { |
LOG(INFO) << "Subscriptions on. Nudging syncer for initial push."; |
NudgeSyncImpl(0, kLocal); |
} |
@@ -543,7 +632,7 @@ |
break; |
case TalkMediatorEvent::NOTIFICATION_RECEIVED: |
LOG(INFO) << "P2P: Updates on server, pushing syncer"; |
- if (NULL != syncer_) { |
+ if (NULL != vault_.syncer_) { |
NudgeSyncImpl(0, kNotification); |
} |
break; |
@@ -551,8 +640,9 @@ |
break; |
} |
- if (NULL != syncer_) { |
- syncer_->set_notifications_enabled(p2p_authenticated_ && p2p_subscribed_); |
+ if (NULL != vault_.syncer_) { |
+ vault_.syncer_->set_notifications_enabled( |
+ p2p_authenticated_ && p2p_subscribed_); |
} |
} |