Index: chrome/browser/sync/engine/syncapi.cc |
diff --git a/chrome/browser/sync/engine/syncapi.cc b/chrome/browser/sync/engine/syncapi.cc |
index 894bd1e48385dfec3f75b2e452773ab5870b306e..2668ec3c3eb62467e710ef3920534387dc88f1d3 100644 |
--- a/chrome/browser/sync/engine/syncapi.cc |
+++ b/chrome/browser/sync/engine/syncapi.cc |
@@ -38,8 +38,9 @@ |
#include "chrome/browser/sync/js_arg_list.h" |
#include "chrome/browser/sync/js_backend.h" |
#include "chrome/browser/sync/js_event_router.h" |
-#include "chrome/browser/sync/notifier/server_notifier_thread.h" |
-#include "chrome/browser/sync/notifier/state_writer.h" |
+#include "chrome/browser/sync/notifier/sync_notifier.h" |
+#include "chrome/browser/sync/notifier/sync_notifier_factory.h" |
+#include "chrome/browser/sync/notifier/sync_notifier_observer.h" |
#include "chrome/browser/sync/protocol/app_specifics.pb.h" |
#include "chrome/browser/sync/protocol/autofill_specifics.pb.h" |
#include "chrome/browser/sync/protocol/bookmark_specifics.pb.h" |
@@ -63,10 +64,6 @@ |
#include "chrome/common/deprecated/event_sys.h" |
#include "chrome/common/net/gaia/gaia_authenticator.h" |
#include "content/browser/browser_thread.h" |
-#include "jingle/notifier/listener/mediator_thread_impl.h" |
-#include "jingle/notifier/listener/notification_constants.h" |
-#include "jingle/notifier/listener/talk_mediator.h" |
-#include "jingle/notifier/listener/talk_mediator_impl.h" |
#include "net/base/network_change_notifier.h" |
using browser_sync::AllStatus; |
@@ -83,8 +80,6 @@ using browser_sync::SyncerThread; |
using browser_sync::SyncerThreadAdapter; |
using browser_sync::kNigoriTag; |
using browser_sync::sessions::SyncSessionContext; |
-using notifier::TalkMediator; |
-using notifier::TalkMediatorImpl; |
using std::list; |
using std::hex; |
using std::string; |
@@ -93,6 +88,7 @@ using syncable::Directory; |
using syncable::DirectoryManager; |
using syncable::Entry; |
using syncable::SPECIFICS; |
+using sync_notifier::SyncNotifierFactory; |
using sync_pb::AutofillProfileSpecifics; |
typedef GoogleServiceAuthError AuthError; |
@@ -1113,8 +1109,7 @@ const sync_pb::PasswordSpecificsData& |
// SyncManager's implementation: SyncManager::SyncInternal |
class SyncManager::SyncInternal |
: public net::NetworkChangeNotifier::IPAddressObserver, |
- public TalkMediator::Delegate, |
- public sync_notifier::StateWriter, |
+ public sync_notifier::SyncNotifierObserver, |
public browser_sync::ChannelEventHandler<syncable::DirectoryChangeEvent>, |
public browser_sync::JsBackend, |
public SyncEngineEventListener { |
@@ -1126,10 +1121,8 @@ class SyncManager::SyncInternal |
parent_router_(NULL), |
sync_manager_(sync_manager), |
registrar_(NULL), |
- notification_pending_(false), |
initialized_(false), |
- ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)), |
- server_notifier_thread_(NULL) { |
+ ALLOW_THIS_IN_INITIALIZER_LIST(method_factory_(this)) { |
DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); |
} |
@@ -1146,7 +1139,6 @@ class SyncManager::SyncInternal |
ModelSafeWorkerRegistrar* model_safe_worker_registrar, |
const char* user_agent, |
const SyncCredentials& credentials, |
- const notifier::NotifierOptions& notifier_options, |
const std::string& restored_key_for_bootstrapping, |
bool setup_for_test_mode); |
@@ -1200,21 +1192,14 @@ class SyncManager::SyncInternal |
// Open the directory named with username_for_share |
bool OpenDirectory(); |
- // Login to the talk mediator with the given credentials. |
- void TalkMediatorLogin( |
- const std::string& email, const std::string& token); |
- |
- // TalkMediator::Delegate implementation. |
+ // SyncNotifierObserver implementation. |
virtual void OnNotificationStateChange( |
bool notifications_enabled); |
virtual void OnIncomingNotification( |
- const IncomingNotificationData& notification_data); |
- |
- virtual void OnOutgoingNotification(); |
+ const browser_sync::sessions::TypePayloadMap& type_payloads); |
- // sync_notifier::StateWriter implementation. |
- virtual void WriteState(const std::string& state); |
+ virtual void StoreState(const std::string& cookie); |
void AddObserver(SyncManager::Observer* observer); |
@@ -1226,7 +1211,6 @@ class SyncManager::SyncInternal |
return connection_manager_.get(); |
} |
SyncerThreadAdapter* syncer_thread() { return syncer_thread_.get(); } |
- TalkMediator* talk_mediator() { return talk_mediator_.get(); } |
UserShare* GetUserShare() { return &share_; } |
// Return the currently active (validated) username for use with syncable |
@@ -1334,10 +1318,6 @@ class SyncManager::SyncInternal |
const browser_sync::JsEventHandler* sender); |
private: |
- // Helper to handle the details of initializing the TalkMediator. |
- // Must be called only after OpenDirectory() is called. |
- void InitializeTalkMediator(); |
- |
// Helper to call OnAuthError when no authentication credentials are |
// available. |
void RaiseAuthNeededEvent(); |
@@ -1347,10 +1327,8 @@ class SyncManager::SyncInternal |
// already initialized, this is a no-op. |
void MarkAndNotifyInitializationComplete(); |
- // If there's a pending notification to be sent, either from the |
- // new_pending_notification flag or a previous unsuccessfully sent |
- // notification, tries to send a notification. |
- void SendPendingXMPPNotification(bool new_pending_notification); |
+ // Sends notifications to peers. |
+ void SendNotification(); |
// Determine if the parents or predecessors differ between the old and new |
// versions of an entry stored in |a| and |b|. Note that a node's index may |
@@ -1467,8 +1445,8 @@ class SyncManager::SyncInternal |
// The thread that runs the Syncer. Needs to be explicitly Start()ed. |
scoped_ptr<SyncerThreadAdapter> syncer_thread_; |
- // Notification (xmpp) handler. |
- scoped_ptr<TalkMediator> talk_mediator_; |
+ // The SyncNotifier which notifies us when updates need to be downloaded. |
+ scoped_ptr<sync_notifier::SyncNotifier> sync_notifier_; |
// A multi-purpose status watch object that aggregates stats from various |
// sync components. |
@@ -1503,9 +1481,6 @@ class SyncManager::SyncInternal |
// The instance is shared between the SyncManager and the Syncer. |
ModelSafeWorkerRegistrar* registrar_; |
- // True if the next SyncCycle should notify peers of an update. |
- bool notification_pending_; |
- |
// Set to true once Init has been called, and we know of an authenticated |
// valid) username either from a fresh authentication attempt (as in |
// first-use case) or from a previous attempt stored in our UserSettings |
@@ -1515,17 +1490,11 @@ class SyncManager::SyncInternal |
bool initialized_; |
mutable base::Lock initialized_mutex_; |
- notifier::NotifierOptions notifier_options_; |
- |
// True if the SyncManager should be running in test mode (no syncer thread |
// actually communicating with the server). |
bool setup_for_test_mode_; |
- syncable::ModelTypeSet enabled_types_; |
- |
ScopedRunnableMethodFactory<SyncManager::SyncInternal> method_factory_; |
- |
- sync_notifier::ServerNotifierThread* server_notifier_thread_; |
}; |
const int SyncManager::SyncInternal::kDefaultNudgeDelayMilliseconds = 200; |
const int SyncManager::SyncInternal::kPreferencesNudgeDelayMilliseconds = 2000; |
@@ -1544,7 +1513,6 @@ bool SyncManager::Init(const FilePath& database_location, |
ModelSafeWorkerRegistrar* registrar, |
const char* user_agent, |
const SyncCredentials& credentials, |
- const notifier::NotifierOptions& notifier_options, |
const std::string& restored_key_for_bootstrapping, |
bool setup_for_test_mode) { |
DCHECK(post_factory); |
@@ -1558,7 +1526,6 @@ bool SyncManager::Init(const FilePath& database_location, |
registrar, |
user_agent, |
credentials, |
- notifier_options, |
restored_key_for_bootstrapping, |
setup_for_test_mode); |
} |
@@ -1660,7 +1627,6 @@ bool SyncManager::SyncInternal::Init( |
ModelSafeWorkerRegistrar* model_safe_worker_registrar, |
const char* user_agent, |
const SyncCredentials& credentials, |
- const notifier::NotifierOptions& notifier_options, |
const std::string& restored_key_for_bootstrapping, |
bool setup_for_test_mode) { |
@@ -1668,7 +1634,6 @@ bool SyncManager::SyncInternal::Init( |
core_message_loop_ = MessageLoop::current(); |
DCHECK(core_message_loop_); |
- notifier_options_ = notifier_options; |
registrar_ = model_safe_worker_registrar; |
setup_for_test_mode_ = setup_for_test_mode; |
@@ -1782,38 +1747,14 @@ void SyncManager::SyncInternal::MarkAndNotifyInitializationComplete() { |
OnInitializationComplete()); |
} |
-void SyncManager::SyncInternal::SendPendingXMPPNotification( |
- bool new_pending_notification) { |
+void SyncManager::SyncInternal::SendNotification() { |
DCHECK_EQ(MessageLoop::current(), core_message_loop_); |
- DCHECK_NE(notifier_options_.notification_method, |
- notifier::NOTIFICATION_SERVER); |
- notification_pending_ = notification_pending_ || new_pending_notification; |
- if (!notification_pending_) { |
- VLOG(1) << "Not sending notification: no pending notification"; |
+ if (!sync_notifier_.get()) { |
+ VLOG(1) << "Not sending notification: sync_notifier_ is NULL"; |
return; |
} |
- if (!talk_mediator()) { |
- VLOG(1) << "Not sending notification: shutting down (talk_mediator_ is " |
- "NULL)"; |
- return; |
- } |
- VLOG(1) << "Sending XMPP notification..."; |
- OutgoingNotificationData notification_data; |
- notification_data.service_id = browser_sync::kSyncServiceId; |
- notification_data.service_url = browser_sync::kSyncServiceUrl; |
- notification_data.send_content = true; |
- notification_data.priority = browser_sync::kSyncPriority; |
- notification_data.write_to_cache_only = true; |
- notification_data.service_specific_data = |
- browser_sync::kSyncServiceSpecificData; |
- notification_data.require_subscription = true; |
- bool success = talk_mediator()->SendNotification(notification_data); |
- if (success) { |
- notification_pending_ = false; |
- VLOG(1) << "Sent XMPP notification"; |
- } else { |
- VLOG(1) << "Could not send XMPP notification"; |
- } |
+ allstatus_.IncrementNotificationsSent(); |
+ sync_notifier_->SendNotification(); |
} |
bool SyncManager::SyncInternal::OpenDirectory() { |
@@ -1855,6 +1796,29 @@ bool SyncManager::SyncInternal::SignIn(const SyncCredentials& credentials) { |
if (!OpenDirectory()) |
return false; |
+ // Initialize the sync notifier. This should be done only after OpenDirectory |
+ // is called, as we also need to set the state. |
+ // TODO(nileshagrawal): Pass SyncNotifierFactory as an argument to Init |
+ // to improve testability. |
+ SyncNotifierFactory sync_notifier_factory; |
+ sync_notifier_.reset(sync_notifier_factory.CreateSyncNotifier( |
+ *CommandLine::ForCurrentProcess())); |
+ sync_notifier_->AddObserver(this); |
+ |
+ syncable::ScopedDirLookup lookup(dir_manager(), username_for_share()); |
+ std::string state; |
+ if (lookup.good()) { |
+ state = lookup->GetAndClearNotificationState(); |
+ } else { |
+ LOG(ERROR) << "Could not read notification state"; |
+ } |
+ if (VLOG_IS_ON(1)) { |
+ std::string encoded_state; |
+ base::Base64Encode(state, &encoded_state); |
+ VLOG(1) << "Read notification state: " << encoded_state; |
+ } |
+ sync_notifier_->SetState(state); |
+ |
if (!setup_for_test_mode_) { |
UpdateCredentials(credentials); |
} |
@@ -1865,8 +1829,11 @@ void SyncManager::SyncInternal::UpdateCredentials( |
const SyncCredentials& credentials) { |
DCHECK_EQ(MessageLoop::current(), core_message_loop_); |
DCHECK_EQ(credentials.email, share_.name); |
+ DCHECK(!credentials.email.empty()); |
+ DCHECK(!credentials.sync_token.empty()); |
connection_manager()->set_auth_token(credentials.sync_token); |
- TalkMediatorLogin(credentials.email, credentials.sync_token); |
+ sync_notifier_->UpdateCredentials( |
+ credentials.email, credentials.sync_token); |
CheckServerReachable(); |
} |
@@ -1874,51 +1841,7 @@ void SyncManager::SyncInternal::UpdateEnabledTypes( |
const syncable::ModelTypeSet& types) { |
DCHECK_EQ(MessageLoop::current(), core_message_loop_); |
- enabled_types_ = types; |
- if (server_notifier_thread_ != NULL) { |
- server_notifier_thread_->UpdateEnabledTypes(types); |
- } |
-} |
- |
-void SyncManager::SyncInternal::InitializeTalkMediator() { |
- if (notifier_options_.notification_method == |
- notifier::NOTIFICATION_SERVER) { |
- syncable::ScopedDirLookup lookup(dir_manager(), username_for_share()); |
- std::string state; |
- if (lookup.good()) |
- state = lookup->GetAndClearNotificationState(); |
- else |
- LOG(ERROR) << "Could not read notification state"; |
- if (VLOG_IS_ON(1)) { |
- std::string encoded_state; |
- base::Base64Encode(state, &encoded_state); |
- VLOG(1) << "Read notification state: " << encoded_state; |
- } |
- |
- // |talk_mediator_| takes ownership of |sync_notifier_thread_| |
- // but it is. guaranteed that |sync_notifier_thread_| is destroyed only |
- // when |talk_mediator_| is (see the comments in talk_mediator.h). |
- server_notifier_thread_ = new sync_notifier::ServerNotifierThread( |
- notifier_options_, state, this); |
- talk_mediator_.reset( |
- new TalkMediatorImpl(server_notifier_thread_, |
- notifier_options_.invalidate_xmpp_login, |
- notifier_options_.allow_insecure_connection)); |
- |
- // Since we may be initialized more than once, make sure that any |
- // newly created server notifier thread has the latest enabled types. |
- server_notifier_thread_->UpdateEnabledTypes(enabled_types_); |
- } else { |
- notifier::MediatorThread* mediator_thread = |
- new notifier::MediatorThreadImpl(notifier_options_); |
- talk_mediator_.reset( |
- new TalkMediatorImpl(mediator_thread, |
- notifier_options_.invalidate_xmpp_login, |
- notifier_options_.allow_insecure_connection)); |
- talk_mediator_->AddSubscribedServiceUrl(browser_sync::kSyncServiceUrl); |
- server_notifier_thread_ = NULL; |
- } |
- talk_mediator()->SetDelegate(this); |
+ sync_notifier_->UpdateEnabledTypes(types); |
} |
void SyncManager::SyncInternal::RaiseAuthNeededEvent() { |
@@ -2136,12 +2059,6 @@ void SyncManager::Shutdown() { |
void SyncManager::SyncInternal::Shutdown() { |
method_factory_.RevokeAll(); |
- // We NULL out talk_mediator_ so that any tasks pumped below do not |
- // trigger further XMPP actions. |
- // |
- // TODO(akalin): NULL the other member variables defensively, too. |
- scoped_ptr<TalkMediator> talk_mediator(talk_mediator_.release()); |
- |
if (syncer_thread()) { |
if (!syncer_thread()->Stop(kThreadExitTimeoutMsec)) { |
LOG(FATAL) << "Unable to stop the syncer, it won't be happy..."; |
@@ -2149,17 +2066,12 @@ void SyncManager::SyncInternal::Shutdown() { |
syncer_thread_.reset(); |
} |
- // Shutdown the xmpp buzz connection. |
- if (talk_mediator.get()) { |
- VLOG(1) << "P2P: Mediator logout started."; |
- talk_mediator->Logout(); |
- VLOG(1) << "P2P: Mediator logout completed."; |
- talk_mediator.reset(); |
- |
- // |server_notifier_thread_| is owned by |talk_mediator|. We NULL |
- // it out here so as to not have a dangling pointer. |
- server_notifier_thread_= NULL; |
- VLOG(1) << "P2P: Mediator destroyed."; |
+ // We NULL out sync_notifer_ so that any pending tasks do not |
+ // trigger further notifications. |
+ // TODO(akalin): NULL the other member variables defensively, too. |
+ if (sync_notifier_.get()) { |
+ sync_notifier_->RemoveObserver(this); |
+ sync_notifier_.reset(); |
} |
// Pump any messages the auth watcher, syncer thread, or talk |
@@ -2487,18 +2399,19 @@ void SyncManager::SyncInternal::OnSyncEngineEvent( |
OnSyncCycleCompleted(event.snapshot)); |
} |
- if (notifier_options_.notification_method != |
- notifier::NOTIFICATION_SERVER) { |
- // TODO(chron): Consider changing this back to track has_more_to_sync |
- // only notify peers if a successful commit has occurred. |
- bool new_pending_notification = |
- (event.snapshot->syncer_status.num_successful_commits > 0); |
+ // This is here for tests, which are still using p2p notifications. |
+ // SendNotification does not do anything if we are using server based |
+ // notifications. |
+ // TODO(chron): Consider changing this back to track has_more_to_sync |
+ // only notify peers if a successful commit has occurred. |
+ bool new_notification = |
+ (event.snapshot->syncer_status.num_successful_commits > 0); |
+ if (new_notification) { |
core_message_loop_->PostTask( |
FROM_HERE, |
NewRunnableMethod( |
this, |
- &SyncManager::SyncInternal::SendPendingXMPPNotification, |
- new_pending_notification)); |
+ &SyncManager::SyncInternal::SendNotification)); |
} |
} |
@@ -2646,87 +2559,16 @@ void SyncManager::SyncInternal::OnNotificationStateChange( |
parent_router_->RouteJsEvent("onSyncNotificationStateChange", |
browser_sync::JsArgList(args), NULL); |
} |
- if ((notifier_options_.notification_method != |
- notifier::NOTIFICATION_SERVER) && notifications_enabled) { |
- // Nudge the syncer thread when notifications are enabled, in case there is |
- // any data that has not yet been synced. If we are listening to |
- // server-issued notifications, we are already guaranteed to receive a |
- // notification on a successful connection. |
- if (syncer_thread()) { |
- syncer_thread()->NudgeSyncer(0, SyncerThread::kLocal); |
- } |
- |
- // Send a notification as soon as subscriptions are on |
- // (see http://code.google.com/p/chromium/issues/detail?id=38563 ). |
- core_message_loop_->PostTask( |
- FROM_HERE, |
- NewRunnableMethod( |
- this, |
- &SyncManager::SyncInternal::SendPendingXMPPNotification, |
- true)); |
- } |
-} |
- |
-void SyncManager::SyncInternal::TalkMediatorLogin( |
- const std::string& email, const std::string& token) { |
- DCHECK_EQ(MessageLoop::current(), core_message_loop_); |
- DCHECK(!email.empty()); |
- DCHECK(!token.empty()); |
- InitializeTalkMediator(); |
- talk_mediator()->SetAuthToken(email, token, SYNC_SERVICE_NAME); |
- talk_mediator()->Login(); |
} |
void SyncManager::SyncInternal::OnIncomingNotification( |
- const IncomingNotificationData& notification_data) { |
- browser_sync::sessions::TypePayloadMap model_types_with_payloads; |
- |
- // Check if the service url is a sync URL. An empty service URL is |
- // treated as a legacy sync notification. If we're listening to |
- // server-issued notifications, no need to check the service_url. |
- if (notifier_options_.notification_method == |
- notifier::NOTIFICATION_SERVER) { |
- VLOG(1) << "Sync received server notification from " << |
- notification_data.service_url << ": " << |
- notification_data.service_specific_data; |
- syncable::ModelTypeBitSet model_types; |
- const std::string& model_type_list = notification_data.service_url; |
- const std::string& notification_payload = |
- notification_data.service_specific_data; |
- |
- if (!syncable::ModelTypeBitSetFromString(model_type_list, &model_types)) { |
- LOG(DFATAL) << "Could not extract model types from server data."; |
- model_types.set(); |
- } |
- |
- model_types_with_payloads = |
- browser_sync::sessions::MakeTypePayloadMapFromBitSet(model_types, |
- notification_payload); |
- } else if (notification_data.service_url.empty() || |
- (notification_data.service_url == |
- browser_sync::kSyncLegacyServiceUrl) || |
- (notification_data.service_url == |
- browser_sync::kSyncServiceUrl)) { |
- VLOG(1) << "Sync received P2P notification."; |
- |
- // Catch for sync integration tests (uses p2p). Just set all enabled |
- // datatypes. |
- ModelSafeRoutingInfo routes; |
- registrar_->GetModelSafeRoutingInfo(&routes); |
- model_types_with_payloads = |
- browser_sync::sessions::MakeTypePayloadMapFromRoutingInfo(routes, |
- std::string()); |
- } else { |
- LOG(WARNING) << "Notification fron unexpected source: " |
- << notification_data.service_url; |
- } |
- |
- if (!model_types_with_payloads.empty()) { |
+ const browser_sync::sessions::TypePayloadMap& type_payloads) { |
+ if (!type_payloads.empty()) { |
if (syncer_thread()) { |
syncer_thread()->NudgeSyncerWithPayloads( |
kSyncerThreadDelayMsec, |
SyncerThread::kNotification, |
- model_types_with_payloads); |
+ type_payloads); |
} |
allstatus_.IncrementNotificationsReceived(); |
} else { |
@@ -2738,8 +2580,8 @@ void SyncManager::SyncInternal::OnIncomingNotification( |
ListValue* changed_types = new ListValue(); |
args.Append(changed_types); |
for (browser_sync::sessions::TypePayloadMap::const_iterator |
- it = model_types_with_payloads.begin(); |
- it != model_types_with_payloads.end(); ++it) { |
+ it = type_payloads.begin(); |
+ it != type_payloads.end(); ++it) { |
const std::string& model_type_str = |
syncable::ModelTypeToString(it->first); |
changed_types->Append(Value::CreateStringValue(model_type_str)); |
@@ -2749,13 +2591,8 @@ void SyncManager::SyncInternal::OnIncomingNotification( |
} |
} |
-void SyncManager::SyncInternal::OnOutgoingNotification() { |
- DCHECK_NE(notifier_options_.notification_method, |
- notifier::NOTIFICATION_SERVER); |
- allstatus_.IncrementNotificationsSent(); |
-} |
- |
-void SyncManager::SyncInternal::WriteState(const std::string& state) { |
+void SyncManager::SyncInternal::StoreState( |
+ const std::string& state) { |
syncable::ScopedDirLookup lookup(dir_manager(), username_for_share()); |
if (!lookup.good()) { |
LOG(ERROR) << "Could not write notification state"; |
@@ -2837,10 +2674,11 @@ void SyncManager::TriggerOnNotificationStateChangeForTest( |
void SyncManager::TriggerOnIncomingNotificationForTest( |
const syncable::ModelTypeBitSet& model_types) { |
- IncomingNotificationData notification_data; |
- notification_data.service_url = model_types.to_string(); |
- // Here we rely on the default notification method being SERVER. |
- data_->OnIncomingNotification(notification_data); |
+ browser_sync::sessions::TypePayloadMap model_types_with_payloads = |
+ browser_sync::sessions::MakeTypePayloadMapFromBitSet(model_types, |
+ std::string()); |
+ |
+ data_->OnIncomingNotification(model_types_with_payloads); |
} |
} // namespace sync_api |