Index: sync/notifier/sync_invalidation_listener.cc |
diff --git a/sync/notifier/sync_invalidation_listener.cc b/sync/notifier/sync_invalidation_listener.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..8a6acb1b0a001067e4198fe594ab959525a13426 |
--- /dev/null |
+++ b/sync/notifier/sync_invalidation_listener.cc |
@@ -0,0 +1,443 @@ |
+// Copyright (c) 2012 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "sync/notifier/sync_invalidation_listener.h" |
+ |
+#include <vector> |
+ |
+#include "base/bind.h" |
+#include "base/callback.h" |
+#include "base/compiler_specific.h" |
+#include "base/logging.h" |
+#include "base/tracked_objects.h" |
+#include "google/cacheinvalidation/include/invalidation-client.h" |
+#include "google/cacheinvalidation/include/types.h" |
+#include "jingle/notifier/listener/push_client.h" |
+#include "sync/notifier/invalidation_util.h" |
+#include "sync/notifier/object_id_invalidation_map.h" |
+#include "sync/notifier/registration_manager.h" |
+ |
+namespace { |
+ |
+const char kApplicationName[] = "chrome-sync"; |
+ |
+} // namespace |
+ |
+namespace syncer { |
+ |
+SyncInvalidationListener::Delegate::~Delegate() {} |
+ |
+SyncInvalidationListener::SyncInvalidationListener( |
+ scoped_ptr<SyncNetworkChannel> network_channel) |
+ : sync_network_channel_(network_channel.Pass()), |
+ sync_system_resources_(sync_network_channel_.get(), this), |
+ delegate_(NULL), |
+ ticl_state_(DEFAULT_INVALIDATION_ERROR), |
+ push_client_state_(DEFAULT_INVALIDATION_ERROR), |
+ weak_ptr_factory_(this) { |
+ DCHECK(CalledOnValidThread()); |
+ sync_network_channel_->AddObserver(this); |
+} |
+ |
+SyncInvalidationListener::~SyncInvalidationListener() { |
+ DCHECK(CalledOnValidThread()); |
+ sync_network_channel_->RemoveObserver(this); |
+ Stop(); |
+ DCHECK(!delegate_); |
+} |
+ |
+void SyncInvalidationListener::Start( |
+ const CreateInvalidationClientCallback& |
+ create_invalidation_client_callback, |
+ const std::string& client_id, const std::string& client_info, |
+ const std::string& invalidation_bootstrap_data, |
+ const UnackedInvalidationsMap& initial_unacked_invalidations, |
+ const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker, |
+ Delegate* delegate) { |
+ DCHECK(CalledOnValidThread()); |
+ Stop(); |
+ |
+ sync_system_resources_.set_platform(client_info); |
+ sync_system_resources_.Start(); |
+ |
+ // The Storage resource is implemented as a write-through cache. We populate |
+ // it with the initial state on startup, so subsequent writes go to disk and |
+ // update the in-memory cache, while reads just return the cached state. |
+ sync_system_resources_.storage()->SetInitialState( |
+ invalidation_bootstrap_data); |
+ |
+ unacked_invalidations_map_ = initial_unacked_invalidations; |
+ invalidation_state_tracker_ = invalidation_state_tracker; |
+ DCHECK(invalidation_state_tracker_.IsInitialized()); |
+ |
+ DCHECK(!delegate_); |
+ DCHECK(delegate); |
+ delegate_ = delegate; |
+ |
+ invalidation_client_.reset(create_invalidation_client_callback.Run( |
+ &sync_system_resources_, |
+ sync_network_channel_->GetInvalidationClientType(), |
+ client_id, |
+ kApplicationName, |
+ this)); |
+ invalidation_client_->Start(); |
+ |
+ registration_manager_.reset( |
+ new RegistrationManager(invalidation_client_.get())); |
+} |
+ |
+void SyncInvalidationListener::UpdateCredentials( |
+ const std::string& email, const std::string& token) { |
+ DCHECK(CalledOnValidThread()); |
+ sync_network_channel_->UpdateCredentials(email, token); |
+} |
+ |
+void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { |
+ DCHECK(CalledOnValidThread()); |
+ registered_ids_ = ids; |
+ // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a |
+ // working XMPP connection (as observed by us), so check it instead |
+ // of GetState() (see http://crbug.com/139424). |
+ if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_) { |
+ DoRegistrationUpdate(); |
+ } |
+} |
+ |
+void SyncInvalidationListener::Ready( |
+ invalidation::InvalidationClient* client) { |
+ DCHECK(CalledOnValidThread()); |
+ DCHECK_EQ(client, invalidation_client_.get()); |
+ ticl_state_ = INVALIDATIONS_ENABLED; |
+ EmitStateChange(); |
+ DoRegistrationUpdate(); |
+} |
+ |
+void SyncInvalidationListener::Invalidate( |
+ invalidation::InvalidationClient* client, |
+ const invalidation::Invalidation& invalidation, |
+ const invalidation::AckHandle& ack_handle) { |
+ DCHECK(CalledOnValidThread()); |
+ DCHECK_EQ(client, invalidation_client_.get()); |
+ client->Acknowledge(ack_handle); |
+ |
+ const invalidation::ObjectId& id = invalidation.object_id(); |
+ |
+ std::string payload; |
+ // payload() CHECK()'s has_payload(), so we must check it ourselves first. |
+ if (invalidation.has_payload()) |
+ payload = invalidation.payload(); |
+ |
+ DVLOG(2) << "Received invalidation with version " << invalidation.version() |
+ << " for " << ObjectIdToString(id); |
+ |
+ ObjectIdInvalidationMap invalidations; |
+ Invalidation inv = Invalidation::Init(id, invalidation.version(), payload); |
+ inv.set_ack_handler(GetThisAsAckHandler()); |
+ invalidations.Insert(inv); |
+ |
+ DispatchInvalidations(invalidations); |
+} |
+ |
+void SyncInvalidationListener::InvalidateUnknownVersion( |
+ invalidation::InvalidationClient* client, |
+ const invalidation::ObjectId& object_id, |
+ const invalidation::AckHandle& ack_handle) { |
+ DCHECK(CalledOnValidThread()); |
+ DCHECK_EQ(client, invalidation_client_.get()); |
+ DVLOG(1) << "InvalidateUnknownVersion"; |
+ client->Acknowledge(ack_handle); |
+ |
+ ObjectIdInvalidationMap invalidations; |
+ Invalidation unknown_version = Invalidation::InitUnknownVersion(object_id); |
+ unknown_version.set_ack_handler(GetThisAsAckHandler()); |
+ invalidations.Insert(unknown_version); |
+ |
+ DispatchInvalidations(invalidations); |
+} |
+ |
+// This should behave as if we got an invalidation with version |
+// UNKNOWN_OBJECT_VERSION for all known data types. |
+void SyncInvalidationListener::InvalidateAll( |
+ invalidation::InvalidationClient* client, |
+ const invalidation::AckHandle& ack_handle) { |
+ DCHECK(CalledOnValidThread()); |
+ DCHECK_EQ(client, invalidation_client_.get()); |
+ DVLOG(1) << "InvalidateAll"; |
+ client->Acknowledge(ack_handle); |
+ |
+ ObjectIdInvalidationMap invalidations; |
+ for (ObjectIdSet::iterator it = registered_ids_.begin(); |
+ it != registered_ids_.end(); ++it) { |
+ Invalidation unknown_version = Invalidation::InitUnknownVersion(*it); |
+ unknown_version.set_ack_handler(GetThisAsAckHandler()); |
+ invalidations.Insert(unknown_version); |
+ } |
+ |
+ DispatchInvalidations(invalidations); |
+} |
+ |
+// If a handler is registered, emit right away. Otherwise, save it for later. |
+void SyncInvalidationListener::DispatchInvalidations( |
+ const ObjectIdInvalidationMap& invalidations) { |
+ DCHECK(CalledOnValidThread()); |
+ |
+ ObjectIdInvalidationMap to_save = invalidations; |
+ ObjectIdInvalidationMap to_emit = |
+ invalidations.GetSubsetWithObjectIds(registered_ids_); |
+ |
+ SaveInvalidations(to_save); |
+ EmitSavedInvalidations(to_emit); |
+} |
+ |
+void SyncInvalidationListener::SaveInvalidations( |
+ const ObjectIdInvalidationMap& to_save) { |
+ ObjectIdSet objects_to_save = to_save.GetObjectIds(); |
+ for (ObjectIdSet::const_iterator it = objects_to_save.begin(); |
+ it != objects_to_save.end(); ++it) { |
+ UnackedInvalidationsMap::iterator lookup = |
+ unacked_invalidations_map_.find(*it); |
+ if (lookup == unacked_invalidations_map_.end()) { |
+ lookup = unacked_invalidations_map_.insert( |
+ std::make_pair(*it, UnackedInvalidationSet(*it))).first; |
+ } |
+ lookup->second.AddSet(to_save.ForObject(*it)); |
+ } |
+ |
+ invalidation_state_tracker_.Call( |
+ FROM_HERE, |
+ &InvalidationStateTracker::SetSavedInvalidations, |
+ unacked_invalidations_map_); |
+} |
+ |
+void SyncInvalidationListener::EmitSavedInvalidations( |
+ const ObjectIdInvalidationMap& to_emit) { |
+ DVLOG(2) << "Emitting invalidations: " << to_emit.ToString(); |
+ delegate_->OnInvalidate(to_emit); |
+} |
+ |
+void SyncInvalidationListener::InformRegistrationStatus( |
+ invalidation::InvalidationClient* client, |
+ const invalidation::ObjectId& object_id, |
+ InvalidationListener::RegistrationState new_state) { |
+ DCHECK(CalledOnValidThread()); |
+ DCHECK_EQ(client, invalidation_client_.get()); |
+ DVLOG(1) << "InformRegistrationStatus: " |
+ << ObjectIdToString(object_id) << " " << new_state; |
+ |
+ if (new_state != InvalidationListener::REGISTERED) { |
+ // Let |registration_manager_| handle the registration backoff policy. |
+ registration_manager_->MarkRegistrationLost(object_id); |
+ } |
+} |
+ |
+void SyncInvalidationListener::InformRegistrationFailure( |
+ invalidation::InvalidationClient* client, |
+ const invalidation::ObjectId& object_id, |
+ bool is_transient, |
+ const std::string& error_message) { |
+ DCHECK(CalledOnValidThread()); |
+ DCHECK_EQ(client, invalidation_client_.get()); |
+ DVLOG(1) << "InformRegistrationFailure: " |
+ << ObjectIdToString(object_id) |
+ << "is_transient=" << is_transient |
+ << ", message=" << error_message; |
+ |
+ if (is_transient) { |
+ // We don't care about |unknown_hint|; we let |
+ // |registration_manager_| handle the registration backoff policy. |
+ registration_manager_->MarkRegistrationLost(object_id); |
+ } else { |
+ // Non-transient failures require an action to resolve. This could happen |
+ // because: |
+ // - the server doesn't yet recognize the data type, which could happen for |
+ // brand-new data types. |
+ // - the user has changed his password and hasn't updated it yet locally. |
+ // Either way, block future registration attempts for |object_id|. However, |
+ // we don't forget any saved invalidation state since we may use it once the |
+ // error is addressed. |
+ registration_manager_->DisableId(object_id); |
+ } |
+} |
+ |
+void SyncInvalidationListener::ReissueRegistrations( |
+ invalidation::InvalidationClient* client, |
+ const std::string& prefix, |
+ int prefix_length) { |
+ DCHECK(CalledOnValidThread()); |
+ DCHECK_EQ(client, invalidation_client_.get()); |
+ DVLOG(1) << "AllRegistrationsLost"; |
+ registration_manager_->MarkAllRegistrationsLost(); |
+} |
+ |
+void SyncInvalidationListener::InformError( |
+ invalidation::InvalidationClient* client, |
+ const invalidation::ErrorInfo& error_info) { |
+ DCHECK(CalledOnValidThread()); |
+ DCHECK_EQ(client, invalidation_client_.get()); |
+ LOG(ERROR) << "Ticl error " << error_info.error_reason() << ": " |
+ << error_info.error_message() |
+ << " (transient = " << error_info.is_transient() << ")"; |
+ if (error_info.error_reason() == invalidation::ErrorReason::AUTH_FAILURE) { |
+ ticl_state_ = INVALIDATION_CREDENTIALS_REJECTED; |
+ } else { |
+ ticl_state_ = TRANSIENT_INVALIDATION_ERROR; |
+ } |
+ EmitStateChange(); |
+} |
+ |
+void SyncInvalidationListener::Acknowledge( |
+ const invalidation::ObjectId& id, |
+ const syncer::AckHandle& handle) { |
+ UnackedInvalidationsMap::iterator lookup = |
+ unacked_invalidations_map_.find(id); |
+ if (lookup == unacked_invalidations_map_.end()) { |
+ DLOG(WARNING) << "Received acknowledgement for untracked object ID"; |
+ return; |
+ } |
+ lookup->second.Acknowledge(handle); |
+ invalidation_state_tracker_.Call( |
+ FROM_HERE, |
+ &InvalidationStateTracker::SetSavedInvalidations, |
+ unacked_invalidations_map_); |
+} |
+ |
+void SyncInvalidationListener::Drop( |
+ const invalidation::ObjectId& id, |
+ const syncer::AckHandle& handle) { |
+ UnackedInvalidationsMap::iterator lookup = |
+ unacked_invalidations_map_.find(id); |
+ if (lookup == unacked_invalidations_map_.end()) { |
+ DLOG(WARNING) << "Received drop for untracked object ID"; |
+ return; |
+ } |
+ lookup->second.Drop(handle); |
+ invalidation_state_tracker_.Call( |
+ FROM_HERE, |
+ &InvalidationStateTracker::SetSavedInvalidations, |
+ unacked_invalidations_map_); |
+} |
+ |
+void SyncInvalidationListener::WriteState(const std::string& state) { |
+ DCHECK(CalledOnValidThread()); |
+ DVLOG(1) << "WriteState"; |
+ invalidation_state_tracker_.Call( |
+ FROM_HERE, &InvalidationStateTracker::SetBootstrapData, state); |
+} |
+ |
+void SyncInvalidationListener::DoRegistrationUpdate() { |
+ DCHECK(CalledOnValidThread()); |
+ const ObjectIdSet& unregistered_ids = |
+ registration_manager_->UpdateRegisteredIds(registered_ids_); |
+ for (ObjectIdSet::iterator it = unregistered_ids.begin(); |
+ it != unregistered_ids.end(); ++it) { |
+ unacked_invalidations_map_.erase(*it); |
+ } |
+ invalidation_state_tracker_.Call( |
+ FROM_HERE, |
+ &InvalidationStateTracker::SetSavedInvalidations, |
+ unacked_invalidations_map_); |
+ |
+ ObjectIdInvalidationMap object_id_invalidation_map; |
+ for (UnackedInvalidationsMap::iterator map_it = |
+ unacked_invalidations_map_.begin(); |
+ map_it != unacked_invalidations_map_.end(); ++map_it) { |
+ if (registered_ids_.find(map_it->first) == registered_ids_.end()) { |
+ continue; |
+ } |
+ map_it->second.ExportInvalidations( |
+ GetThisAsAckHandler(), |
+ &object_id_invalidation_map); |
+ } |
+ |
+ // There's no need to run these through DispatchInvalidations(); they've |
+ // already been saved to storage (that's where we found them) so all we need |
+ // to do now is emit them. |
+ EmitSavedInvalidations(object_id_invalidation_map); |
+} |
+ |
+void SyncInvalidationListener::RequestDetailedStatus( |
+ base::Callback<void(const base::DictionaryValue&)> callback) const { |
+ DCHECK(CalledOnValidThread()); |
+ sync_network_channel_->RequestDetailedStatus(callback); |
+ callback.Run(*CollectDebugData()); |
+} |
+ |
+scoped_ptr<base::DictionaryValue> |
+SyncInvalidationListener::CollectDebugData() const { |
+ scoped_ptr<base::DictionaryValue> return_value(new base::DictionaryValue()); |
+ return_value->SetString( |
+ "SyncInvalidationListener.PushClientState", |
+ std::string(InvalidatorStateToString(push_client_state_))); |
+ return_value->SetString("SyncInvalidationListener.TiclState", |
+ std::string(InvalidatorStateToString(ticl_state_))); |
+ scoped_ptr<base::DictionaryValue> unacked_map(new base::DictionaryValue()); |
+ for (UnackedInvalidationsMap::const_iterator it = |
+ unacked_invalidations_map_.begin(); |
+ it != unacked_invalidations_map_.end(); |
+ ++it) { |
+ unacked_map->Set((it->first).name(), (it->second).ToValue().release()); |
+ } |
+ return_value->Set("SyncInvalidationListener.UnackedInvalidationsMap", |
+ unacked_map.release()); |
+ return return_value.Pass(); |
+} |
+ |
+void SyncInvalidationListener::StopForTest() { |
+ DCHECK(CalledOnValidThread()); |
+ Stop(); |
+} |
+ |
+void SyncInvalidationListener::Stop() { |
+ DCHECK(CalledOnValidThread()); |
+ if (!invalidation_client_) { |
+ return; |
+ } |
+ |
+ registration_manager_.reset(); |
+ sync_system_resources_.Stop(); |
+ invalidation_client_->Stop(); |
+ |
+ invalidation_client_.reset(); |
+ delegate_ = NULL; |
+ |
+ ticl_state_ = DEFAULT_INVALIDATION_ERROR; |
+ push_client_state_ = DEFAULT_INVALIDATION_ERROR; |
+} |
+ |
+InvalidatorState SyncInvalidationListener::GetState() const { |
+ DCHECK(CalledOnValidThread()); |
+ if (ticl_state_ == INVALIDATION_CREDENTIALS_REJECTED || |
+ push_client_state_ == INVALIDATION_CREDENTIALS_REJECTED) { |
+ // If either the ticl or the push client rejected our credentials, |
+ // return INVALIDATION_CREDENTIALS_REJECTED. |
+ return INVALIDATION_CREDENTIALS_REJECTED; |
+ } |
+ if (ticl_state_ == INVALIDATIONS_ENABLED && |
+ push_client_state_ == INVALIDATIONS_ENABLED) { |
+ // If the ticl is ready and the push client notifications are |
+ // enabled, return INVALIDATIONS_ENABLED. |
+ return INVALIDATIONS_ENABLED; |
+ } |
+ // Otherwise, we have a transient error. |
+ return TRANSIENT_INVALIDATION_ERROR; |
+} |
+ |
+void SyncInvalidationListener::EmitStateChange() { |
+ DCHECK(CalledOnValidThread()); |
+ delegate_->OnInvalidatorStateChange(GetState()); |
+} |
+ |
+WeakHandle<AckHandler> SyncInvalidationListener::GetThisAsAckHandler() { |
+ DCHECK(CalledOnValidThread()); |
+ return WeakHandle<AckHandler>(weak_ptr_factory_.GetWeakPtr()); |
+} |
+ |
+void SyncInvalidationListener::OnNetworkChannelStateChanged( |
+ InvalidatorState invalidator_state) { |
+ DCHECK(CalledOnValidThread()); |
+ push_client_state_ = invalidator_state; |
+ EmitStateChange(); |
+} |
+ |
+} // namespace syncer |