Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(6064)

Unified Diff: chrome/browser/sync/engine/syncapi.cc

Issue 6621062: Refactor sync notifier out of sync api. (Closed) Base URL: http://git.chromium.org/git/chromium.git@trunk
Patch Set: Minox fixes. Created 9 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « chrome/browser/sync/engine/syncapi.h ('k') | chrome/browser/sync/engine/syncapi_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: chrome/browser/sync/engine/syncapi.cc
diff --git a/chrome/browser/sync/engine/syncapi.cc b/chrome/browser/sync/engine/syncapi.cc
index 894bd1e48385dfec3f75b2e452773ab5870b306e..2668ec3c3eb62467e710ef3920534387dc88f1d3 100644
--- a/chrome/browser/sync/engine/syncapi.cc
+++ b/chrome/browser/sync/engine/syncapi.cc
@@ -38,8 +38,9 @@
#include "chrome/browser/sync/js_arg_list.h"
#include "chrome/browser/sync/js_backend.h"
#include "chrome/browser/sync/js_event_router.h"
-#include "chrome/browser/sync/notifier/server_notifier_thread.h"
-#include "chrome/browser/sync/notifier/state_writer.h"
+#include "chrome/browser/sync/notifier/sync_notifier.h"
+#include "chrome/browser/sync/notifier/sync_notifier_factory.h"
+#include "chrome/browser/sync/notifier/sync_notifier_observer.h"
#include "chrome/browser/sync/protocol/app_specifics.pb.h"
#include "chrome/browser/sync/protocol/autofill_specifics.pb.h"
#include "chrome/browser/sync/protocol/bookmark_specifics.pb.h"
@@ -63,10 +64,6 @@
#include "chrome/common/deprecated/event_sys.h"
#include "chrome/common/net/gaia/gaia_authenticator.h"
#include "content/browser/browser_thread.h"
-#include "jingle/notifier/listener/mediator_thread_impl.h"
-#include "jingle/notifier/listener/notification_constants.h"
-#include "jingle/notifier/listener/talk_mediator.h"
-#include "jingle/notifier/listener/talk_mediator_impl.h"
#include "net/base/network_change_notifier.h"
using browser_sync::AllStatus;
@@ -83,8 +80,6 @@ using browser_sync::SyncerThread;
using browser_sync::SyncerThreadAdapter;
using browser_sync::kNigoriTag;
using browser_sync::sessions::SyncSessionContext;
-using notifier::TalkMediator;
-using notifier::TalkMediatorImpl;
using std::list;
using std::hex;
using std::string;
@@ -93,6 +88,7 @@ using syncable::Directory;
using syncable::DirectoryManager;
using syncable::Entry;
using syncable::SPECIFICS;
+using sync_notifier::SyncNotifierFactory;
using sync_pb::AutofillProfileSpecifics;
typedef GoogleServiceAuthError AuthError;
@@ -1113,8 +1109,7 @@ const sync_pb::PasswordSpecificsData&
// SyncManager's implementation: SyncManager::SyncInternal
class SyncManager::SyncInternal
: public net::NetworkChangeNotifier::IPAddressObserver,
- public TalkMediator::Delegate,
- public sync_notifier::StateWriter,
+ public sync_notifier::SyncNotifierObserver,
public browser_sync::ChannelEventHandler<syncable::DirectoryChangeEvent>,
public browser_sync::JsBackend,
public SyncEngineEventListener {
@@ -1126,10 +1121,8 @@ class SyncManager::SyncInternal
parent_router_(NULL),
sync_manager_(sync_manager),
registrar_(NULL),
- notification_pending_(false),
initialized_(false),
- ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)),
- server_notifier_thread_(NULL) {
+ ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)) {
DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
}
@@ -1146,7 +1139,6 @@ class SyncManager::SyncInternal
ModelSafeWorkerRegistrar* model_safe_worker_registrar,
const char* user_agent,
const SyncCredentials& credentials,
- const notifier::NotifierOptions& notifier_options,
const std::string& restored_key_for_bootstrapping,
bool setup_for_test_mode);
@@ -1200,21 +1192,14 @@ class SyncManager::SyncInternal
// Open the directory named with username_for_share
bool OpenDirectory();
- // Login to the talk mediator with the given credentials.
- void TalkMediatorLogin(
- const std::string& email, const std::string& token);
-
- // TalkMediator::Delegate implementation.
+ // SyncNotifierObserver implementation.
virtual void OnNotificationStateChange(
bool notifications_enabled);
virtual void OnIncomingNotification(
- const IncomingNotificationData& notification_data);
-
- virtual void OnOutgoingNotification();
+ const browser_sync::sessions::TypePayloadMap& type_payloads);
- // sync_notifier::StateWriter implementation.
- virtual void WriteState(const std::string& state);
+ virtual void StoreState(const std::string& cookie);
void AddObserver(SyncManager::Observer* observer);
@@ -1226,7 +1211,6 @@ class SyncManager::SyncInternal
return connection_manager_.get();
}
SyncerThreadAdapter* syncer_thread() { return syncer_thread_.get(); }
- TalkMediator* talk_mediator() { return talk_mediator_.get(); }
UserShare* GetUserShare() { return &share_; }
// Return the currently active (validated) username for use with syncable
@@ -1334,10 +1318,6 @@ class SyncManager::SyncInternal
const browser_sync::JsEventHandler* sender);
private:
- // Helper to handle the details of initializing the TalkMediator.
- // Must be called only after OpenDirectory() is called.
- void InitializeTalkMediator();
-
// Helper to call OnAuthError when no authentication credentials are
// available.
void RaiseAuthNeededEvent();
@@ -1347,10 +1327,8 @@ class SyncManager::SyncInternal
// already initialized, this is a no-op.
void MarkAndNotifyInitializationComplete();
- // If there's a pending notification to be sent, either from the
- // new_pending_notification flag or a previous unsuccessfully sent
- // notification, tries to send a notification.
- void SendPendingXMPPNotification(bool new_pending_notification);
+ // Sends notifications to peers.
+ void SendNotification();
// Determine if the parents or predecessors differ between the old and new
// versions of an entry stored in |a| and |b|. Note that a node's index may
@@ -1467,8 +1445,8 @@ class SyncManager::SyncInternal
// The thread that runs the Syncer. Needs to be explicitly Start()ed.
scoped_ptr<SyncerThreadAdapter> syncer_thread_;
- // Notification (xmpp) handler.
- scoped_ptr<TalkMediator> talk_mediator_;
+ // The SyncNotifier which notifies us when updates need to be downloaded.
+ scoped_ptr<sync_notifier::SyncNotifier> sync_notifier_;
// A multi-purpose status watch object that aggregates stats from various
// sync components.
@@ -1503,9 +1481,6 @@ class SyncManager::SyncInternal
// The instance is shared between the SyncManager and the Syncer.
ModelSafeWorkerRegistrar* registrar_;
- // True if the next SyncCycle should notify peers of an update.
- bool notification_pending_;
-
// Set to true once Init has been called, and we know of an authenticated
// valid) username either from a fresh authentication attempt (as in
// first-use case) or from a previous attempt stored in our UserSettings
@@ -1515,17 +1490,11 @@ class SyncManager::SyncInternal
bool initialized_;
mutable base::Lock initialized_mutex_;
- notifier::NotifierOptions notifier_options_;
-
// True if the SyncManager should be running in test mode (no syncer thread
// actually communicating with the server).
bool setup_for_test_mode_;
- syncable::ModelTypeSet enabled_types_;
-
ScopedRunnableMethodFactory<SyncManager::SyncInternal> method_factory_;
-
- sync_notifier::ServerNotifierThread* server_notifier_thread_;
};
const int SyncManager::SyncInternal::kDefaultNudgeDelayMilliseconds = 200;
const int SyncManager::SyncInternal::kPreferencesNudgeDelayMilliseconds = 2000;
@@ -1544,7 +1513,6 @@ bool SyncManager::Init(const FilePath& database_location,
ModelSafeWorkerRegistrar* registrar,
const char* user_agent,
const SyncCredentials& credentials,
- const notifier::NotifierOptions& notifier_options,
const std::string& restored_key_for_bootstrapping,
bool setup_for_test_mode) {
DCHECK(post_factory);
@@ -1558,7 +1526,6 @@ bool SyncManager::Init(const FilePath& database_location,
registrar,
user_agent,
credentials,
- notifier_options,
restored_key_for_bootstrapping,
setup_for_test_mode);
}
@@ -1660,7 +1627,6 @@ bool SyncManager::SyncInternal::Init(
ModelSafeWorkerRegistrar* model_safe_worker_registrar,
const char* user_agent,
const SyncCredentials& credentials,
- const notifier::NotifierOptions& notifier_options,
const std::string& restored_key_for_bootstrapping,
bool setup_for_test_mode) {
@@ -1668,7 +1634,6 @@ bool SyncManager::SyncInternal::Init(
core_message_loop_ = MessageLoop::current();
DCHECK(core_message_loop_);
- notifier_options_ = notifier_options;
registrar_ = model_safe_worker_registrar;
setup_for_test_mode_ = setup_for_test_mode;
@@ -1782,38 +1747,14 @@ void SyncManager::SyncInternal::MarkAndNotifyInitializationComplete() {
OnInitializationComplete());
}
-void SyncManager::SyncInternal::SendPendingXMPPNotification(
- bool new_pending_notification) {
+void SyncManager::SyncInternal::SendNotification() {
DCHECK_EQ(MessageLoop::current(), core_message_loop_);
- DCHECK_NE(notifier_options_.notification_method,
- notifier::NOTIFICATION_SERVER);
- notification_pending_ = notification_pending_ || new_pending_notification;
- if (!notification_pending_) {
- VLOG(1) << "Not sending notification: no pending notification";
+ if (!sync_notifier_.get()) {
+ VLOG(1) << "Not sending notification: sync_notifier_ is NULL";
return;
}
- if (!talk_mediator()) {
- VLOG(1) << "Not sending notification: shutting down (talk_mediator_ is "
- "NULL)";
- return;
- }
- VLOG(1) << "Sending XMPP notification...";
- OutgoingNotificationData notification_data;
- notification_data.service_id = browser_sync::kSyncServiceId;
- notification_data.service_url = browser_sync::kSyncServiceUrl;
- notification_data.send_content = true;
- notification_data.priority = browser_sync::kSyncPriority;
- notification_data.write_to_cache_only = true;
- notification_data.service_specific_data =
- browser_sync::kSyncServiceSpecificData;
- notification_data.require_subscription = true;
- bool success = talk_mediator()->SendNotification(notification_data);
- if (success) {
- notification_pending_ = false;
- VLOG(1) << "Sent XMPP notification";
- } else {
- VLOG(1) << "Could not send XMPP notification";
- }
+ allstatus_.IncrementNotificationsSent();
+ sync_notifier_->SendNotification();
}
bool SyncManager::SyncInternal::OpenDirectory() {
@@ -1855,6 +1796,29 @@ bool SyncManager::SyncInternal::SignIn(const SyncCredentials& credentials) {
if (!OpenDirectory())
return false;
+ // Initialize the sync notifier. This should be done only after OpenDirectory
+ // is called, as we also need to set the state.
+ // TODO(nileshagrawal): Pass SyncNotifierFactory as an argument to Init
+ // to improve testability.
+ SyncNotifierFactory sync_notifier_factory;
+ sync_notifier_.reset(sync_notifier_factory.CreateSyncNotifier(
+ *CommandLine::ForCurrentProcess()));
+ sync_notifier_->AddObserver(this);
+
+ syncable::ScopedDirLookup lookup(dir_manager(), username_for_share());
+ std::string state;
+ if (lookup.good()) {
+ state = lookup->GetAndClearNotificationState();
+ } else {
+ LOG(ERROR) << "Could not read notification state";
+ }
+ if (VLOG_IS_ON(1)) {
+ std::string encoded_state;
+ base::Base64Encode(state, &encoded_state);
+ VLOG(1) << "Read notification state: " << encoded_state;
+ }
+ sync_notifier_->SetState(state);
+
if (!setup_for_test_mode_) {
UpdateCredentials(credentials);
}
@@ -1865,8 +1829,11 @@ void SyncManager::SyncInternal::UpdateCredentials(
const SyncCredentials& credentials) {
DCHECK_EQ(MessageLoop::current(), core_message_loop_);
DCHECK_EQ(credentials.email, share_.name);
+ DCHECK(!credentials.email.empty());
+ DCHECK(!credentials.sync_token.empty());
connection_manager()->set_auth_token(credentials.sync_token);
- TalkMediatorLogin(credentials.email, credentials.sync_token);
+ sync_notifier_->UpdateCredentials(
+ credentials.email, credentials.sync_token);
CheckServerReachable();
}
@@ -1874,51 +1841,7 @@ void SyncManager::SyncInternal::UpdateEnabledTypes(
const syncable::ModelTypeSet& types) {
DCHECK_EQ(MessageLoop::current(), core_message_loop_);
- enabled_types_ = types;
- if (server_notifier_thread_ != NULL) {
- server_notifier_thread_->UpdateEnabledTypes(types);
- }
-}
-
-void SyncManager::SyncInternal::InitializeTalkMediator() {
- if (notifier_options_.notification_method ==
- notifier::NOTIFICATION_SERVER) {
- syncable::ScopedDirLookup lookup(dir_manager(), username_for_share());
- std::string state;
- if (lookup.good())
- state = lookup->GetAndClearNotificationState();
- else
- LOG(ERROR) << "Could not read notification state";
- if (VLOG_IS_ON(1)) {
- std::string encoded_state;
- base::Base64Encode(state, &encoded_state);
- VLOG(1) << "Read notification state: " << encoded_state;
- }
-
- // |talk_mediator_| takes ownership of |sync_notifier_thread_|
- // but it is. guaranteed that |sync_notifier_thread_| is destroyed only
- // when |talk_mediator_| is (see the comments in talk_mediator.h).
- server_notifier_thread_ = new sync_notifier::ServerNotifierThread(
- notifier_options_, state, this);
- talk_mediator_.reset(
- new TalkMediatorImpl(server_notifier_thread_,
- notifier_options_.invalidate_xmpp_login,
- notifier_options_.allow_insecure_connection));
-
- // Since we may be initialized more than once, make sure that any
- // newly created server notifier thread has the latest enabled types.
- server_notifier_thread_->UpdateEnabledTypes(enabled_types_);
- } else {
- notifier::MediatorThread* mediator_thread =
- new notifier::MediatorThreadImpl(notifier_options_);
- talk_mediator_.reset(
- new TalkMediatorImpl(mediator_thread,
- notifier_options_.invalidate_xmpp_login,
- notifier_options_.allow_insecure_connection));
- talk_mediator_->AddSubscribedServiceUrl(browser_sync::kSyncServiceUrl);
- server_notifier_thread_ = NULL;
- }
- talk_mediator()->SetDelegate(this);
+ sync_notifier_->UpdateEnabledTypes(types);
}
void SyncManager::SyncInternal::RaiseAuthNeededEvent() {
@@ -2136,12 +2059,6 @@ void SyncManager::Shutdown() {
void SyncManager::SyncInternal::Shutdown() {
method_factory_.RevokeAll();
- // We NULL out talk_mediator_ so that any tasks pumped below do not
- // trigger further XMPP actions.
- //
- // TODO(akalin): NULL the other member variables defensively, too.
- scoped_ptr<TalkMediator> talk_mediator(talk_mediator_.release());
-
if (syncer_thread()) {
if (!syncer_thread()->Stop(kThreadExitTimeoutMsec)) {
LOG(FATAL) << "Unable to stop the syncer, it won't be happy...";
@@ -2149,17 +2066,12 @@ void SyncManager::SyncInternal::Shutdown() {
syncer_thread_.reset();
}
- // Shutdown the xmpp buzz connection.
- if (talk_mediator.get()) {
- VLOG(1) << "P2P: Mediator logout started.";
- talk_mediator->Logout();
- VLOG(1) << "P2P: Mediator logout completed.";
- talk_mediator.reset();
-
- // |server_notifier_thread_| is owned by |talk_mediator|. We NULL
- // it out here so as to not have a dangling pointer.
- server_notifier_thread_= NULL;
- VLOG(1) << "P2P: Mediator destroyed.";
+ // We NULL out sync_notifer_ so that any pending tasks do not
+ // trigger further notifications.
+ // TODO(akalin): NULL the other member variables defensively, too.
+ if (sync_notifier_.get()) {
+ sync_notifier_->RemoveObserver(this);
+ sync_notifier_.reset();
}
// Pump any messages the auth watcher, syncer thread, or talk
@@ -2487,18 +2399,19 @@ void SyncManager::SyncInternal::OnSyncEngineEvent(
OnSyncCycleCompleted(event.snapshot));
}
- if (notifier_options_.notification_method !=
- notifier::NOTIFICATION_SERVER) {
- // TODO(chron): Consider changing this back to track has_more_to_sync
- // only notify peers if a successful commit has occurred.
- bool new_pending_notification =
- (event.snapshot->syncer_status.num_successful_commits > 0);
+ // This is here for tests, which are still using p2p notifications.
+ // SendNotification does not do anything if we are using server based
+ // notifications.
+ // TODO(chron): Consider changing this back to track has_more_to_sync
+ // only notify peers if a successful commit has occurred.
+ bool new_notification =
+ (event.snapshot->syncer_status.num_successful_commits > 0);
+ if (new_notification) {
core_message_loop_->PostTask(
FROM_HERE,
NewRunnableMethod(
this,
- &SyncManager::SyncInternal::SendPendingXMPPNotification,
- new_pending_notification));
+ &SyncManager::SyncInternal::SendNotification));
}
}
@@ -2646,87 +2559,16 @@ void SyncManager::SyncInternal::OnNotificationStateChange(
parent_router_->RouteJsEvent("onSyncNotificationStateChange",
browser_sync::JsArgList(args), NULL);
}
- if ((notifier_options_.notification_method !=
- notifier::NOTIFICATION_SERVER) && notifications_enabled) {
- // Nudge the syncer thread when notifications are enabled, in case there is
- // any data that has not yet been synced. If we are listening to
- // server-issued notifications, we are already guaranteed to receive a
- // notification on a successful connection.
- if (syncer_thread()) {
- syncer_thread()->NudgeSyncer(0, SyncerThread::kLocal);
- }
-
- // Send a notification as soon as subscriptions are on
- // (see http://code.google.com/p/chromium/issues/detail?id=38563 ).
- core_message_loop_->PostTask(
- FROM_HERE,
- NewRunnableMethod(
- this,
- &SyncManager::SyncInternal::SendPendingXMPPNotification,
- true));
- }
-}
-
-void SyncManager::SyncInternal::TalkMediatorLogin(
- const std::string& email, const std::string& token) {
- DCHECK_EQ(MessageLoop::current(), core_message_loop_);
- DCHECK(!email.empty());
- DCHECK(!token.empty());
- InitializeTalkMediator();
- talk_mediator()->SetAuthToken(email, token, SYNC_SERVICE_NAME);
- talk_mediator()->Login();
}
void SyncManager::SyncInternal::OnIncomingNotification(
- const IncomingNotificationData& notification_data) {
- browser_sync::sessions::TypePayloadMap model_types_with_payloads;
-
- // Check if the service url is a sync URL. An empty service URL is
- // treated as a legacy sync notification. If we're listening to
- // server-issued notifications, no need to check the service_url.
- if (notifier_options_.notification_method ==
- notifier::NOTIFICATION_SERVER) {
- VLOG(1) << "Sync received server notification from " <<
- notification_data.service_url << ": " <<
- notification_data.service_specific_data;
- syncable::ModelTypeBitSet model_types;
- const std::string& model_type_list = notification_data.service_url;
- const std::string& notification_payload =
- notification_data.service_specific_data;
-
- if (!syncable::ModelTypeBitSetFromString(model_type_list, &model_types)) {
- LOG(DFATAL) << "Could not extract model types from server data.";
- model_types.set();
- }
-
- model_types_with_payloads =
- browser_sync::sessions::MakeTypePayloadMapFromBitSet(model_types,
- notification_payload);
- } else if (notification_data.service_url.empty() ||
- (notification_data.service_url ==
- browser_sync::kSyncLegacyServiceUrl) ||
- (notification_data.service_url ==
- browser_sync::kSyncServiceUrl)) {
- VLOG(1) << "Sync received P2P notification.";
-
- // Catch for sync integration tests (uses p2p). Just set all enabled
- // datatypes.
- ModelSafeRoutingInfo routes;
- registrar_->GetModelSafeRoutingInfo(&routes);
- model_types_with_payloads =
- browser_sync::sessions::MakeTypePayloadMapFromRoutingInfo(routes,
- std::string());
- } else {
- LOG(WARNING) << "Notification fron unexpected source: "
- << notification_data.service_url;
- }
-
- if (!model_types_with_payloads.empty()) {
+ const browser_sync::sessions::TypePayloadMap& type_payloads) {
+ if (!type_payloads.empty()) {
if (syncer_thread()) {
syncer_thread()->NudgeSyncerWithPayloads(
kSyncerThreadDelayMsec,
SyncerThread::kNotification,
- model_types_with_payloads);
+ type_payloads);
}
allstatus_.IncrementNotificationsReceived();
} else {
@@ -2738,8 +2580,8 @@ void SyncManager::SyncInternal::OnIncomingNotification(
ListValue* changed_types = new ListValue();
args.Append(changed_types);
for (browser_sync::sessions::TypePayloadMap::const_iterator
- it = model_types_with_payloads.begin();
- it != model_types_with_payloads.end(); ++it) {
+ it = type_payloads.begin();
+ it != type_payloads.end(); ++it) {
const std::string& model_type_str =
syncable::ModelTypeToString(it->first);
changed_types->Append(Value::CreateStringValue(model_type_str));
@@ -2749,13 +2591,8 @@ void SyncManager::SyncInternal::OnIncomingNotification(
}
}
-void SyncManager::SyncInternal::OnOutgoingNotification() {
- DCHECK_NE(notifier_options_.notification_method,
- notifier::NOTIFICATION_SERVER);
- allstatus_.IncrementNotificationsSent();
-}
-
-void SyncManager::SyncInternal::WriteState(const std::string& state) {
+void SyncManager::SyncInternal::StoreState(
+ const std::string& state) {
syncable::ScopedDirLookup lookup(dir_manager(), username_for_share());
if (!lookup.good()) {
LOG(ERROR) << "Could not write notification state";
@@ -2837,10 +2674,11 @@ void SyncManager::TriggerOnNotificationStateChangeForTest(
void SyncManager::TriggerOnIncomingNotificationForTest(
const syncable::ModelTypeBitSet& model_types) {
- IncomingNotificationData notification_data;
- notification_data.service_url = model_types.to_string();
- // Here we rely on the default notification method being SERVER.
- data_->OnIncomingNotification(notification_data);
+ browser_sync::sessions::TypePayloadMap model_types_with_payloads =
+ browser_sync::sessions::MakeTypePayloadMapFromBitSet(model_types,
+ std::string());
+
+ data_->OnIncomingNotification(model_types_with_payloads);
}
} // namespace sync_api
« no previous file with comments | « chrome/browser/sync/engine/syncapi.h ('k') | chrome/browser/sync/engine/syncapi_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698