| Index: chrome/browser/sync/engine/syncer_thread.cc
|
| ===================================================================
|
| --- chrome/browser/sync/engine/syncer_thread.cc (revision 27084)
|
| +++ chrome/browser/sync/engine/syncer_thread.cc (working copy)
|
| @@ -1,7 +1,6 @@
|
| // Copyright (c) 2009 The Chromium Authors. All rights reserved.
|
| // Use of this source code is governed by a BSD-style license that can be
|
| // found in the LICENSE file.
|
| -
|
| #include "chrome/browser/sync/engine/syncer_thread.h"
|
|
|
| #include "build/build_config.h"
|
| @@ -16,21 +15,24 @@
|
| #include <map>
|
| #include <queue>
|
|
|
| +#include "base/command_line.h"
|
| #include "chrome/browser/sync/engine/auth_watcher.h"
|
| #include "chrome/browser/sync/engine/model_safe_worker.h"
|
| #include "chrome/browser/sync/engine/net/server_connection_manager.h"
|
| #include "chrome/browser/sync/engine/syncer.h"
|
| +#include "chrome/browser/sync/engine/syncer_thread_pthreads.h"
|
| +#include "chrome/browser/sync/engine/syncer_thread_timed_stop.h"
|
| #include "chrome/browser/sync/notifier/listener/talk_mediator.h"
|
| #include "chrome/browser/sync/notifier/listener/talk_mediator_impl.h"
|
| #include "chrome/browser/sync/syncable/directory_manager.h"
|
| +#include "chrome/common/chrome_switches.h"
|
|
|
| using std::priority_queue;
|
| using std::min;
|
| +using base::Time;
|
| +using base::TimeDelta;
|
| +using base::TimeTicks;
|
|
|
| -static inline bool operator < (const timespec& a, const timespec& b) {
|
| - return a.tv_sec == b.tv_sec ? a.tv_nsec < b.tv_nsec : a.tv_sec < b.tv_sec;
|
| -}
|
| -
|
| namespace {
|
|
|
| // Returns the amount of time since the user last interacted with the computer,
|
| @@ -108,17 +110,56 @@
|
|
|
| namespace browser_sync {
|
|
|
| +SyncerThread* SyncerThreadFactory::Create(
|
| + ClientCommandChannel* command_channel,
|
| + syncable::DirectoryManager* mgr,
|
| + ServerConnectionManager* connection_manager, AllStatus* all_status,
|
| + ModelSafeWorker* model_safe_worker) {
|
| + const CommandLine* cmd = CommandLine::ForCurrentProcess();
|
| + if (cmd->HasSwitch(switches::kSyncerThreadTimedStop)) {
|
| + return new SyncerThreadTimedStop(command_channel, mgr, connection_manager,
|
| + all_status, model_safe_worker);
|
| + } else if (cmd->HasSwitch(switches::kSyncerThreadPthreads)) {
|
| + return new SyncerThreadPthreads(command_channel, mgr, connection_manager,
|
| + all_status, model_safe_worker);
|
| + } else {
|
| + // The default SyncerThread implementation, which does not time-out when
|
| + // Stop is called.
|
| + return new SyncerThread(command_channel, mgr, connection_manager,
|
| + all_status, model_safe_worker);
|
| + }
|
| +}
|
| +
|
| bool SyncerThread::NudgeSyncer(int milliseconds_from_now, NudgeSource source) {
|
| - MutexLock lock(&mutex_);
|
| - if (syncer_ == NULL) {
|
| + AutoLock lock(lock_);
|
| + if (vault_.syncer_ == NULL) {
|
| return false;
|
| }
|
| NudgeSyncImpl(milliseconds_from_now, source);
|
| return true;
|
| }
|
|
|
| -void* RunSyncerThread(void* syncer_thread) {
|
| - return (reinterpret_cast<SyncerThread*>(syncer_thread))->ThreadMain();
|
| +SyncerThread::SyncerThread()
|
| + : thread_main_started_(false, false),
|
| + thread_("SyncEngine_SyncerThread"),
|
| + vault_field_changed_(&lock_),
|
| + p2p_authenticated_(false),
|
| + p2p_subscribed_(false),
|
| + client_command_hookup_(NULL),
|
| + conn_mgr_hookup_(NULL),
|
| + allstatus_(NULL),
|
| + dirman_(NULL),
|
| + scm_(NULL),
|
| + syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds),
|
| + syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds),
|
| + syncer_polling_interval_(kDefaultShortPollIntervalSeconds),
|
| + syncer_max_interval_(kDefaultMaxPollIntervalMs),
|
| + talk_mediator_hookup_(NULL),
|
| + command_channel_(NULL),
|
| + directory_manager_hookup_(NULL),
|
| + syncer_events_(NULL),
|
| + model_safe_worker_(NULL),
|
| + disable_idle_detection_(false) {
|
| }
|
|
|
| SyncerThread::SyncerThread(
|
| @@ -127,18 +168,26 @@
|
| ServerConnectionManager* connection_manager,
|
| AllStatus* all_status,
|
| ModelSafeWorker* model_safe_worker)
|
| - : dirman_(mgr), scm_(connection_manager),
|
| - syncer_(NULL), syncer_events_(NULL), thread_running_(false),
|
| + : thread_main_started_(false, false),
|
| + thread_("SyncEngine_SyncerThread"),
|
| + vault_field_changed_(&lock_),
|
| + p2p_authenticated_(false),
|
| + p2p_subscribed_(false),
|
| + client_command_hookup_(NULL),
|
| + conn_mgr_hookup_(NULL),
|
| + allstatus_(all_status),
|
| + dirman_(mgr),
|
| + scm_(connection_manager),
|
| syncer_short_poll_interval_seconds_(kDefaultShortPollIntervalSeconds),
|
| syncer_long_poll_interval_seconds_(kDefaultLongPollIntervalSeconds),
|
| syncer_polling_interval_(kDefaultShortPollIntervalSeconds),
|
| syncer_max_interval_(kDefaultMaxPollIntervalMs),
|
| - stop_syncer_thread_(false), connected_(false), conn_mgr_hookup_(NULL),
|
| - p2p_authenticated_(false), p2p_subscribed_(false),
|
| - allstatus_(all_status), talk_mediator_hookup_(NULL),
|
| - command_channel_(command_channel), directory_manager_hookup_(NULL),
|
| + talk_mediator_hookup_(NULL),
|
| + command_channel_(command_channel),
|
| + directory_manager_hookup_(NULL),
|
| + syncer_events_(NULL),
|
| model_safe_worker_(model_safe_worker),
|
| - client_command_hookup_(NULL), disable_idle_detection_(false) {
|
| + disable_idle_detection_(false) {
|
|
|
| SyncerEvent shutdown = { SyncerEvent::SHUTDOWN_USE_WITH_CARE };
|
| syncer_event_channel_.reset(new SyncerEventChannel(shutdown));
|
| @@ -163,55 +212,76 @@
|
| syncer_event_channel_.reset();
|
| directory_manager_hookup_.reset();
|
| syncer_events_.reset();
|
| - delete syncer_;
|
| + delete vault_.syncer_;
|
| talk_mediator_hookup_.reset();
|
| - CHECK(!thread_running_);
|
| + CHECK(!thread_.IsRunning());
|
| }
|
|
|
| // Creates and starts a syncer thread.
|
| // Returns true if it creates a thread or if there's currently a thread running
|
| // and false otherwise.
|
| bool SyncerThread::Start() {
|
| - MutexLock lock(&mutex_);
|
| - if (thread_running_) {
|
| - return true;
|
| + {
|
| + AutoLock lock(lock_);
|
| + if (thread_.IsRunning()) {
|
| + return true;
|
| + }
|
| +
|
| + if (!thread_.Start()) {
|
| + return false;
|
| + }
|
| }
|
| - thread_running_ =
|
| - (0 == pthread_create(&thread_, NULL, RunSyncerThread, this));
|
| - if (thread_running_) {
|
| - pthread_detach(thread_);
|
| - }
|
| - return thread_running_;
|
| +
|
| + thread_.message_loop()->PostTask(FROM_HERE, NewRunnableMethod(this,
|
| + &SyncerThread::ThreadMain));
|
| +
|
| + // Wait for notification that our task makes it safely onto the message
|
| + // loop before returning, so the caller can't call Stop before we're
|
| + // actually up and running. This is for consistency with the old pthread
|
| + // impl because pthread_create would do this in one step.
|
| + thread_main_started_.Wait();
|
| + LOG(INFO) << "SyncerThread started.";
|
| + return true;
|
| }
|
|
|
| // Stop processing. A max wait of at least 2*server RTT time is recommended.
|
| // Returns true if we stopped, false otherwise.
|
| bool SyncerThread::Stop(int max_wait) {
|
| - MutexLock lock(&mutex_);
|
| - if (!thread_running_)
|
| - return true;
|
| - stop_syncer_thread_ = true;
|
| - if (NULL != syncer_) {
|
| - // Try to early exit the syncer.
|
| - syncer_->RequestEarlyExit();
|
| + {
|
| + AutoLock lock(lock_);
|
| + // If the thread has been started, then we either already have or are about
|
| + // to enter ThreadMainLoop so we have to proceed with shutdown and wait for
|
| + // it to finish. If the thread has not been started --and we now own the
|
| + // lock-- then we can early out because the caller has not called Start().
|
| + if (!thread_.IsRunning())
|
| + return true;
|
| +
|
| + LOG(INFO) << "SyncerThread::Stop - setting ThreadMain exit condition to "
|
| + << "true (vault_.stop_syncer_thread_)";
|
| + // Exit the ThreadMainLoop once the syncer finishes (we tell it to exit
|
| + // below).
|
| + vault_.stop_syncer_thread_ = true;
|
| + if (NULL != vault_.syncer_) {
|
| + // Try to early exit the syncer itself, which could be looping inside
|
| + // SyncShare.
|
| + vault_.syncer_->RequestEarlyExit();
|
| + }
|
| +
|
| + // stop_syncer_thread_ is now true and the Syncer has been told to exit.
|
| + // We want to wake up all waiters so they can re-examine state. We signal,
|
| + // causing all waiters to try to re-acquire the lock, and then we release
|
| + // the lock, and join on our internal thread which should soon run off the
|
| + // end of ThreadMain.
|
| + vault_field_changed_.Broadcast();
|
| }
|
| - pthread_cond_broadcast(&changed_.condvar_);
|
| - timespec deadline = { time(NULL) + (max_wait / 1000), 0 };
|
| - do {
|
| - const int wait_result = max_wait < 0 ?
|
| - pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_) :
|
| - pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_,
|
| - &deadline);
|
| - if (ETIMEDOUT == wait_result) {
|
| - LOG(ERROR) << "SyncerThread::Stop timed out. Problems likely.";
|
| - return false;
|
| - }
|
| - } while (thread_running_);
|
| +
|
| + // This will join, and finish when ThreadMain terminates.
|
| + thread_.Stop();
|
| return true;
|
| }
|
|
|
| void SyncerThread::WatchClientCommands(ClientCommandChannel* channel) {
|
| - PThreadScopedLock<PThreadMutex> lock(&mutex_);
|
| + AutoLock lock(lock_);
|
| client_command_hookup_.reset(NewEventListenerHookup(channel, this,
|
| &SyncerThread::HandleClientCommand));
|
| }
|
| @@ -232,67 +302,89 @@
|
| }
|
|
|
| void SyncerThread::ThreadMainLoop() {
|
| + // This is called with lock_ acquired.
|
| + lock_.AssertAcquired();
|
| + LOG(INFO) << "In thread main loop.";
|
| +
|
| // Use the short poll value by default.
|
| - int poll_seconds = syncer_short_poll_interval_seconds_;
|
| + TimeDelta poll_seconds =
|
| + TimeDelta::FromSeconds(syncer_short_poll_interval_seconds_);
|
| int user_idle_milliseconds = 0;
|
| - timespec last_sync_time = { 0 };
|
| + TimeTicks last_sync_time;
|
| bool initial_sync_for_thread = true;
|
| bool continue_sync_cycle = false;
|
|
|
| - while (!stop_syncer_thread_) {
|
| - if (!connected_) {
|
| + while (!vault_.stop_syncer_thread_) {
|
| + // The Wait()s in these conditionals using |vault_| are not TimedWait()s (as
|
| + // below) because we cannot poll until these conditions are met, so we wait
|
| + // indefinitely.
|
| + if (!vault_.connected_) {
|
| LOG(INFO) << "Syncer thread waiting for connection.";
|
| - while (!connected_ && !stop_syncer_thread_)
|
| - pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_);
|
| - LOG_IF(INFO, connected_) << "Syncer thread found connection.";
|
| + while (!vault_.connected_ && !vault_.stop_syncer_thread_)
|
| + vault_field_changed_.Wait();
|
| + LOG_IF(INFO, vault_.connected_) << "Syncer thread found connection.";
|
| continue;
|
| }
|
|
|
| - if (syncer_ == NULL) {
|
| + if (vault_.syncer_ == NULL) {
|
| LOG(INFO) << "Syncer thread waiting for database initialization.";
|
| - while (syncer_ == NULL && !stop_syncer_thread_)
|
| - pthread_cond_wait(&changed_.condvar_, &mutex_.mutex_);
|
| - LOG_IF(INFO, !(syncer_ == NULL)) << "Syncer was found after DB started.";
|
| + while (vault_.syncer_ == NULL && !vault_.stop_syncer_thread_)
|
| + vault_field_changed_.Wait();
|
| + LOG_IF(INFO, !(vault_.syncer_ == NULL))
|
| + << "Syncer was found after DB started.";
|
| continue;
|
| }
|
|
|
| - timespec const next_poll = { last_sync_time.tv_sec + poll_seconds,
|
| - last_sync_time.tv_nsec };
|
| - const timespec wake_time =
|
| - !nudge_queue_.empty() && nudge_queue_.top().first < next_poll ?
|
| - nudge_queue_.top().first : next_poll;
|
| - LOG(INFO) << "wake time is " << wake_time.tv_sec;
|
| - LOG(INFO) << "next poll is " << next_poll.tv_sec;
|
| + const TimeTicks next_poll = last_sync_time + poll_seconds;
|
| + const TimeTicks end_wait =
|
| + !vault_.nudge_queue_.empty() &&
|
| + vault_.nudge_queue_.top().first < next_poll ?
|
| + vault_.nudge_queue_.top().first : next_poll;
|
| + LOG(INFO) << "end_wait is " << end_wait.ToInternalValue();
|
| + LOG(INFO) << "next_poll is " << next_poll.ToInternalValue();
|
|
|
| - const int error = pthread_cond_timedwait(&changed_.condvar_, &mutex_.mutex_,
|
| - &wake_time);
|
| - if (ETIMEDOUT != error) {
|
| - continue; // Check all the conditions again.
|
| + // We block until the CV is signaled (e.g a control field changed, loss of
|
| + // network connection, nudge, spurious, etc), or the poll interval elapses.
|
| + TimeDelta sleep_time = end_wait - TimeTicks::Now();
|
| + if (sleep_time > TimeDelta::FromSeconds(0)) {
|
| + vault_field_changed_.TimedWait(sleep_time);
|
| +
|
| + if (TimeTicks::Now() < end_wait) {
|
| + // Didn't timeout. Could be a spurious signal, or a signal corresponding
|
| + // to an actual change in one of our control fields. By continuing here
|
| + // we perform the typical "always recheck conditions when signaled",
|
| + // (typically handled by a while(condition_not_met) cv.wait() construct)
|
| + // because we jump to the top of the loop. The main difference is we
|
| + // recalculate the wait interval, but last_sync_time won't have changed.
|
| + // So if we were signaled by a nudge (for ex.) we'll grab the new nudge
|
| + // off the queue and wait for that delta. If it was a spurious signal,
|
| + // we'll keep waiting for the same moment in time as we just were.
|
| + continue;
|
| + }
|
| }
|
|
|
| - const timespec now = GetPThreadAbsoluteTime(0);
|
| -
|
| // Handle a nudge, caused by either a notification or a local bookmark
|
| // event. This will also update the source of the following SyncMain call.
|
| - UpdateNudgeSource(now, &continue_sync_cycle, &initial_sync_for_thread);
|
| + UpdateNudgeSource(&continue_sync_cycle, &initial_sync_for_thread);
|
|
|
| - LOG(INFO) << "Calling Sync Main at time " << now.tv_sec;
|
| - SyncMain(syncer_);
|
| - last_sync_time = now;
|
| + LOG(INFO) << "Calling Sync Main at time " << Time::Now().ToInternalValue();
|
| + SyncMain(vault_.syncer_);
|
| + last_sync_time = TimeTicks::Now();
|
|
|
| LOG(INFO) << "Updating the next polling time after SyncMain";
|
| - poll_seconds = CalculatePollingWaitTime(allstatus_->status(),
|
| - poll_seconds,
|
| - &user_idle_milliseconds,
|
| - &continue_sync_cycle);
|
| + poll_seconds = TimeDelta::FromSeconds(CalculatePollingWaitTime(
|
| + allstatus_->status(), static_cast<int>(poll_seconds.InSeconds()),
|
| + &user_idle_milliseconds, &continue_sync_cycle));
|
| }
|
| +
|
| }
|
|
|
| // We check how long the user's been idle and sync less often if the machine is
|
| // not in use. The aim is to reduce server load.
|
| +// TODO(timsteele): Should use Time(Delta).
|
| int SyncerThread::CalculatePollingWaitTime(
|
| const AllStatus::Status& status,
|
| - int last_poll_wait, // in s
|
| + int last_poll_wait, // Time in seconds.
|
| int* user_idle_milliseconds,
|
| bool* continue_sync_cycle) {
|
| bool is_continuing_sync_cyle = *continue_sync_cycle;
|
| @@ -343,30 +435,25 @@
|
| return actual_next_wait;
|
| }
|
|
|
| -void* SyncerThread::ThreadMain() {
|
| - NameCurrentThreadForDebugging("SyncEngine_SyncerThread");
|
| - mutex_.Lock();
|
| +void SyncerThread::ThreadMain() {
|
| + AutoLock lock(lock_);
|
| + // Signal Start() to let it know we've made it safely onto the message loop,
|
| + // and unblock it's caller.
|
| + thread_main_started_.Signal();
|
| ThreadMainLoop();
|
| - thread_running_ = false;
|
| - pthread_cond_broadcast(&changed_.condvar_);
|
| - mutex_.Unlock();
|
| - LOG(INFO) << "Syncer thread exiting.";
|
| - return 0;
|
| + LOG(INFO) << "Syncer thread ThreadMain is done.";
|
| }
|
|
|
| void SyncerThread::SyncMain(Syncer* syncer) {
|
| CHECK(syncer);
|
| - mutex_.Unlock();
|
| + AutoUnlock unlock(lock_);
|
| while (syncer->SyncShare()) {
|
| LOG(INFO) << "Looping in sync share";
|
| }
|
| LOG(INFO) << "Done looping in sync share";
|
| -
|
| - mutex_.Lock();
|
| }
|
|
|
| -void SyncerThread::UpdateNudgeSource(const timespec& now,
|
| - bool* continue_sync_cycle,
|
| +void SyncerThread::UpdateNudgeSource(bool* continue_sync_cycle,
|
| bool* initial_sync) {
|
| bool nudged = false;
|
| NudgeSource nudge_source = kUnknown;
|
| @@ -376,13 +463,14 @@
|
| }
|
| // Update the nudge source if a new nudge has come through during the
|
| // previous sync cycle.
|
| - while (!nudge_queue_.empty() && !(now < nudge_queue_.top().first)) {
|
| + while (!vault_.nudge_queue_.empty() &&
|
| + TimeTicks::Now() >= vault_.nudge_queue_.top().first) {
|
| if (!nudged) {
|
| - nudge_source = nudge_queue_.top().second;
|
| + nudge_source = vault_.nudge_queue_.top().second;
|
| *continue_sync_cycle = false; // Reset the continuation token on nudge.
|
| nudged = true;
|
| }
|
| - nudge_queue_.pop();
|
| + vault_.nudge_queue_.pop();
|
| }
|
| SetUpdatesSource(nudged, nudge_source, initial_sync);
|
| }
|
| @@ -413,11 +501,11 @@
|
| break;
|
| }
|
| }
|
| - syncer_->set_updates_source(updates_source);
|
| + vault_.syncer_->set_updates_source(updates_source);
|
| }
|
|
|
| void SyncerThread::HandleSyncerEvent(const SyncerEvent& event) {
|
| - MutexLock lock(&mutex_);
|
| + AutoLock lock(lock_);
|
| channel()->NotifyListeners(event);
|
| if (SyncerEvent::REQUEST_SYNC_NUDGE != event.what_happened) {
|
| return;
|
| @@ -429,33 +517,33 @@
|
| const syncable::DirectoryManagerEvent& event) {
|
| LOG(INFO) << "Handling a directory manager event";
|
| if (syncable::DirectoryManagerEvent::OPENED == event.what_happened) {
|
| - MutexLock lock(&mutex_);
|
| + AutoLock lock(lock_);
|
| LOG(INFO) << "Syncer starting up for: " << event.dirname;
|
| // The underlying database structure is ready, and we should create
|
| // the syncer.
|
| - CHECK(syncer_ == NULL);
|
| - syncer_ =
|
| + CHECK(vault_.syncer_ == NULL);
|
| + vault_.syncer_ =
|
| new Syncer(dirman_, event.dirname, scm_, model_safe_worker_.get());
|
|
|
| - syncer_->set_command_channel(command_channel_);
|
| + vault_.syncer_->set_command_channel(command_channel_);
|
| syncer_events_.reset(NewEventListenerHookup(
|
| - syncer_->channel(), this, &SyncerThread::HandleSyncerEvent));
|
| - pthread_cond_broadcast(&changed_.condvar_);
|
| + vault_.syncer_->channel(), this, &SyncerThread::HandleSyncerEvent));
|
| + vault_field_changed_.Broadcast();
|
| }
|
| }
|
|
|
| static inline void CheckConnected(bool* connected,
|
| HttpResponse::ServerConnectionCode code,
|
| - pthread_cond_t* condvar) {
|
| + ConditionVariable* condvar) {
|
| if (*connected) {
|
| if (HttpResponse::CONNECTION_UNAVAILABLE == code) {
|
| *connected = false;
|
| - pthread_cond_broadcast(condvar);
|
| + condvar->Broadcast();
|
| }
|
| } else {
|
| if (HttpResponse::SERVER_CONNECTION_OK == code) {
|
| *connected = true;
|
| - pthread_cond_broadcast(condvar);
|
| + condvar->Broadcast();
|
| }
|
| }
|
| }
|
| @@ -463,16 +551,16 @@
|
| void SyncerThread::WatchConnectionManager(ServerConnectionManager* conn_mgr) {
|
| conn_mgr_hookup_.reset(NewEventListenerHookup(conn_mgr->channel(), this,
|
| &SyncerThread::HandleServerConnectionEvent));
|
| - CheckConnected(&connected_, conn_mgr->server_status(),
|
| - &changed_.condvar_);
|
| + CheckConnected(&vault_.connected_, conn_mgr->server_status(),
|
| + &vault_field_changed_);
|
| }
|
|
|
| void SyncerThread::HandleServerConnectionEvent(
|
| const ServerConnectionEvent& event) {
|
| if (ServerConnectionEvent::STATUS_CHANGED == event.what_happened) {
|
| - MutexLock lock(&mutex_);
|
| - CheckConnected(&connected_, event.connection_code,
|
| - &changed_.condvar_);
|
| + AutoLock lock(lock_);
|
| + CheckConnected(&vault_.connected_, event.connection_code,
|
| + &vault_field_changed_);
|
| }
|
| }
|
|
|
| @@ -504,10 +592,11 @@
|
| // Called with mutex_ already locked.
|
| void SyncerThread::NudgeSyncImpl(int milliseconds_from_now,
|
| NudgeSource source) {
|
| - const timespec nudge_time = GetPThreadAbsoluteTime(milliseconds_from_now);
|
| + const TimeTicks nudge_time = TimeTicks::Now() +
|
| + TimeDelta::FromMilliseconds(milliseconds_from_now);
|
| NudgeObject nudge_object(nudge_time, source);
|
| - nudge_queue_.push(nudge_object);
|
| - pthread_cond_broadcast(&changed_.condvar_);
|
| + vault_.nudge_queue_.push(nudge_object);
|
| + vault_field_changed_.Broadcast();
|
| }
|
|
|
| void SyncerThread::WatchTalkMediator(TalkMediator* mediator) {
|
| @@ -519,7 +608,7 @@
|
| }
|
|
|
| void SyncerThread::HandleTalkMediatorEvent(const TalkMediatorEvent& event) {
|
| - MutexLock lock(&mutex_);
|
| + AutoLock lock(lock_);
|
| switch (event.what_happened) {
|
| case TalkMediatorEvent::LOGIN_SUCCEEDED:
|
| LOG(INFO) << "P2P: Login succeeded.";
|
| @@ -532,7 +621,7 @@
|
| case TalkMediatorEvent::SUBSCRIPTIONS_ON:
|
| LOG(INFO) << "P2P: Subscriptions successfully enabled.";
|
| p2p_subscribed_ = true;
|
| - if (NULL != syncer_) {
|
| + if (NULL != vault_.syncer_) {
|
| LOG(INFO) << "Subscriptions on. Nudging syncer for initial push.";
|
| NudgeSyncImpl(0, kLocal);
|
| }
|
| @@ -543,7 +632,7 @@
|
| break;
|
| case TalkMediatorEvent::NOTIFICATION_RECEIVED:
|
| LOG(INFO) << "P2P: Updates on server, pushing syncer";
|
| - if (NULL != syncer_) {
|
| + if (NULL != vault_.syncer_) {
|
| NudgeSyncImpl(0, kNotification);
|
| }
|
| break;
|
| @@ -551,8 +640,9 @@
|
| break;
|
| }
|
|
|
| - if (NULL != syncer_) {
|
| - syncer_->set_notifications_enabled(p2p_authenticated_ && p2p_subscribed_);
|
| + if (NULL != vault_.syncer_) {
|
| + vault_.syncer_->set_notifications_enabled(
|
| + p2p_authenticated_ && p2p_subscribed_);
|
| }
|
| }
|
|
|
|
|