| Index: sync/notifier/sync_invalidation_listener.cc
|
| diff --git a/sync/notifier/sync_invalidation_listener.cc b/sync/notifier/sync_invalidation_listener.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..8a6acb1b0a001067e4198fe594ab959525a13426
|
| --- /dev/null
|
| +++ b/sync/notifier/sync_invalidation_listener.cc
|
| @@ -0,0 +1,443 @@
|
| +// Copyright (c) 2012 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "sync/notifier/sync_invalidation_listener.h"
|
| +
|
| +#include <vector>
|
| +
|
| +#include "base/bind.h"
|
| +#include "base/callback.h"
|
| +#include "base/compiler_specific.h"
|
| +#include "base/logging.h"
|
| +#include "base/tracked_objects.h"
|
| +#include "google/cacheinvalidation/include/invalidation-client.h"
|
| +#include "google/cacheinvalidation/include/types.h"
|
| +#include "jingle/notifier/listener/push_client.h"
|
| +#include "sync/notifier/invalidation_util.h"
|
| +#include "sync/notifier/object_id_invalidation_map.h"
|
| +#include "sync/notifier/registration_manager.h"
|
| +
|
| +namespace {
|
| +
|
| +const char kApplicationName[] = "chrome-sync";
|
| +
|
| +} // namespace
|
| +
|
| +namespace syncer {
|
| +
|
| +SyncInvalidationListener::Delegate::~Delegate() {}
|
| +
|
| +SyncInvalidationListener::SyncInvalidationListener(
|
| + scoped_ptr<SyncNetworkChannel> network_channel)
|
| + : sync_network_channel_(network_channel.Pass()),
|
| + sync_system_resources_(sync_network_channel_.get(), this),
|
| + delegate_(NULL),
|
| + ticl_state_(DEFAULT_INVALIDATION_ERROR),
|
| + push_client_state_(DEFAULT_INVALIDATION_ERROR),
|
| + weak_ptr_factory_(this) {
|
| + DCHECK(CalledOnValidThread());
|
| + sync_network_channel_->AddObserver(this);
|
| +}
|
| +
|
| +SyncInvalidationListener::~SyncInvalidationListener() {
|
| + DCHECK(CalledOnValidThread());
|
| + sync_network_channel_->RemoveObserver(this);
|
| + Stop();
|
| + DCHECK(!delegate_);
|
| +}
|
| +
|
| +void SyncInvalidationListener::Start(
|
| + const CreateInvalidationClientCallback&
|
| + create_invalidation_client_callback,
|
| + const std::string& client_id, const std::string& client_info,
|
| + const std::string& invalidation_bootstrap_data,
|
| + const UnackedInvalidationsMap& initial_unacked_invalidations,
|
| + const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker,
|
| + Delegate* delegate) {
|
| + DCHECK(CalledOnValidThread());
|
| + Stop();
|
| +
|
| + sync_system_resources_.set_platform(client_info);
|
| + sync_system_resources_.Start();
|
| +
|
| + // The Storage resource is implemented as a write-through cache. We populate
|
| + // it with the initial state on startup, so subsequent writes go to disk and
|
| + // update the in-memory cache, while reads just return the cached state.
|
| + sync_system_resources_.storage()->SetInitialState(
|
| + invalidation_bootstrap_data);
|
| +
|
| + unacked_invalidations_map_ = initial_unacked_invalidations;
|
| + invalidation_state_tracker_ = invalidation_state_tracker;
|
| + DCHECK(invalidation_state_tracker_.IsInitialized());
|
| +
|
| + DCHECK(!delegate_);
|
| + DCHECK(delegate);
|
| + delegate_ = delegate;
|
| +
|
| + invalidation_client_.reset(create_invalidation_client_callback.Run(
|
| + &sync_system_resources_,
|
| + sync_network_channel_->GetInvalidationClientType(),
|
| + client_id,
|
| + kApplicationName,
|
| + this));
|
| + invalidation_client_->Start();
|
| +
|
| + registration_manager_.reset(
|
| + new RegistrationManager(invalidation_client_.get()));
|
| +}
|
| +
|
| +void SyncInvalidationListener::UpdateCredentials(
|
| + const std::string& email, const std::string& token) {
|
| + DCHECK(CalledOnValidThread());
|
| + sync_network_channel_->UpdateCredentials(email, token);
|
| +}
|
| +
|
| +void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) {
|
| + DCHECK(CalledOnValidThread());
|
| + registered_ids_ = ids;
|
| + // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a
|
| + // working XMPP connection (as observed by us), so check it instead
|
| + // of GetState() (see http://crbug.com/139424).
|
| + if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_) {
|
| + DoRegistrationUpdate();
|
| + }
|
| +}
|
| +
|
| +void SyncInvalidationListener::Ready(
|
| + invalidation::InvalidationClient* client) {
|
| + DCHECK(CalledOnValidThread());
|
| + DCHECK_EQ(client, invalidation_client_.get());
|
| + ticl_state_ = INVALIDATIONS_ENABLED;
|
| + EmitStateChange();
|
| + DoRegistrationUpdate();
|
| +}
|
| +
|
| +void SyncInvalidationListener::Invalidate(
|
| + invalidation::InvalidationClient* client,
|
| + const invalidation::Invalidation& invalidation,
|
| + const invalidation::AckHandle& ack_handle) {
|
| + DCHECK(CalledOnValidThread());
|
| + DCHECK_EQ(client, invalidation_client_.get());
|
| + client->Acknowledge(ack_handle);
|
| +
|
| + const invalidation::ObjectId& id = invalidation.object_id();
|
| +
|
| + std::string payload;
|
| + // payload() CHECK()'s has_payload(), so we must check it ourselves first.
|
| + if (invalidation.has_payload())
|
| + payload = invalidation.payload();
|
| +
|
| + DVLOG(2) << "Received invalidation with version " << invalidation.version()
|
| + << " for " << ObjectIdToString(id);
|
| +
|
| + ObjectIdInvalidationMap invalidations;
|
| + Invalidation inv = Invalidation::Init(id, invalidation.version(), payload);
|
| + inv.set_ack_handler(GetThisAsAckHandler());
|
| + invalidations.Insert(inv);
|
| +
|
| + DispatchInvalidations(invalidations);
|
| +}
|
| +
|
| +void SyncInvalidationListener::InvalidateUnknownVersion(
|
| + invalidation::InvalidationClient* client,
|
| + const invalidation::ObjectId& object_id,
|
| + const invalidation::AckHandle& ack_handle) {
|
| + DCHECK(CalledOnValidThread());
|
| + DCHECK_EQ(client, invalidation_client_.get());
|
| + DVLOG(1) << "InvalidateUnknownVersion";
|
| + client->Acknowledge(ack_handle);
|
| +
|
| + ObjectIdInvalidationMap invalidations;
|
| + Invalidation unknown_version = Invalidation::InitUnknownVersion(object_id);
|
| + unknown_version.set_ack_handler(GetThisAsAckHandler());
|
| + invalidations.Insert(unknown_version);
|
| +
|
| + DispatchInvalidations(invalidations);
|
| +}
|
| +
|
| +// This should behave as if we got an invalidation with version
|
| +// UNKNOWN_OBJECT_VERSION for all known data types.
|
| +void SyncInvalidationListener::InvalidateAll(
|
| + invalidation::InvalidationClient* client,
|
| + const invalidation::AckHandle& ack_handle) {
|
| + DCHECK(CalledOnValidThread());
|
| + DCHECK_EQ(client, invalidation_client_.get());
|
| + DVLOG(1) << "InvalidateAll";
|
| + client->Acknowledge(ack_handle);
|
| +
|
| + ObjectIdInvalidationMap invalidations;
|
| + for (ObjectIdSet::iterator it = registered_ids_.begin();
|
| + it != registered_ids_.end(); ++it) {
|
| + Invalidation unknown_version = Invalidation::InitUnknownVersion(*it);
|
| + unknown_version.set_ack_handler(GetThisAsAckHandler());
|
| + invalidations.Insert(unknown_version);
|
| + }
|
| +
|
| + DispatchInvalidations(invalidations);
|
| +}
|
| +
|
| +// If a handler is registered, emit right away. Otherwise, save it for later.
|
| +void SyncInvalidationListener::DispatchInvalidations(
|
| + const ObjectIdInvalidationMap& invalidations) {
|
| + DCHECK(CalledOnValidThread());
|
| +
|
| + ObjectIdInvalidationMap to_save = invalidations;
|
| + ObjectIdInvalidationMap to_emit =
|
| + invalidations.GetSubsetWithObjectIds(registered_ids_);
|
| +
|
| + SaveInvalidations(to_save);
|
| + EmitSavedInvalidations(to_emit);
|
| +}
|
| +
|
| +void SyncInvalidationListener::SaveInvalidations(
|
| + const ObjectIdInvalidationMap& to_save) {
|
| + ObjectIdSet objects_to_save = to_save.GetObjectIds();
|
| + for (ObjectIdSet::const_iterator it = objects_to_save.begin();
|
| + it != objects_to_save.end(); ++it) {
|
| + UnackedInvalidationsMap::iterator lookup =
|
| + unacked_invalidations_map_.find(*it);
|
| + if (lookup == unacked_invalidations_map_.end()) {
|
| + lookup = unacked_invalidations_map_.insert(
|
| + std::make_pair(*it, UnackedInvalidationSet(*it))).first;
|
| + }
|
| + lookup->second.AddSet(to_save.ForObject(*it));
|
| + }
|
| +
|
| + invalidation_state_tracker_.Call(
|
| + FROM_HERE,
|
| + &InvalidationStateTracker::SetSavedInvalidations,
|
| + unacked_invalidations_map_);
|
| +}
|
| +
|
| +void SyncInvalidationListener::EmitSavedInvalidations(
|
| + const ObjectIdInvalidationMap& to_emit) {
|
| + DVLOG(2) << "Emitting invalidations: " << to_emit.ToString();
|
| + delegate_->OnInvalidate(to_emit);
|
| +}
|
| +
|
| +void SyncInvalidationListener::InformRegistrationStatus(
|
| + invalidation::InvalidationClient* client,
|
| + const invalidation::ObjectId& object_id,
|
| + InvalidationListener::RegistrationState new_state) {
|
| + DCHECK(CalledOnValidThread());
|
| + DCHECK_EQ(client, invalidation_client_.get());
|
| + DVLOG(1) << "InformRegistrationStatus: "
|
| + << ObjectIdToString(object_id) << " " << new_state;
|
| +
|
| + if (new_state != InvalidationListener::REGISTERED) {
|
| + // Let |registration_manager_| handle the registration backoff policy.
|
| + registration_manager_->MarkRegistrationLost(object_id);
|
| + }
|
| +}
|
| +
|
| +void SyncInvalidationListener::InformRegistrationFailure(
|
| + invalidation::InvalidationClient* client,
|
| + const invalidation::ObjectId& object_id,
|
| + bool is_transient,
|
| + const std::string& error_message) {
|
| + DCHECK(CalledOnValidThread());
|
| + DCHECK_EQ(client, invalidation_client_.get());
|
| + DVLOG(1) << "InformRegistrationFailure: "
|
| + << ObjectIdToString(object_id)
|
| + << "is_transient=" << is_transient
|
| + << ", message=" << error_message;
|
| +
|
| + if (is_transient) {
|
| + // We don't care about |unknown_hint|; we let
|
| + // |registration_manager_| handle the registration backoff policy.
|
| + registration_manager_->MarkRegistrationLost(object_id);
|
| + } else {
|
| + // Non-transient failures require an action to resolve. This could happen
|
| + // because:
|
| + // - the server doesn't yet recognize the data type, which could happen for
|
| + // brand-new data types.
|
| + // - the user has changed his password and hasn't updated it yet locally.
|
| + // Either way, block future registration attempts for |object_id|. However,
|
| + // we don't forget any saved invalidation state since we may use it once the
|
| + // error is addressed.
|
| + registration_manager_->DisableId(object_id);
|
| + }
|
| +}
|
| +
|
| +void SyncInvalidationListener::ReissueRegistrations(
|
| + invalidation::InvalidationClient* client,
|
| + const std::string& prefix,
|
| + int prefix_length) {
|
| + DCHECK(CalledOnValidThread());
|
| + DCHECK_EQ(client, invalidation_client_.get());
|
| + DVLOG(1) << "AllRegistrationsLost";
|
| + registration_manager_->MarkAllRegistrationsLost();
|
| +}
|
| +
|
| +void SyncInvalidationListener::InformError(
|
| + invalidation::InvalidationClient* client,
|
| + const invalidation::ErrorInfo& error_info) {
|
| + DCHECK(CalledOnValidThread());
|
| + DCHECK_EQ(client, invalidation_client_.get());
|
| + LOG(ERROR) << "Ticl error " << error_info.error_reason() << ": "
|
| + << error_info.error_message()
|
| + << " (transient = " << error_info.is_transient() << ")";
|
| + if (error_info.error_reason() == invalidation::ErrorReason::AUTH_FAILURE) {
|
| + ticl_state_ = INVALIDATION_CREDENTIALS_REJECTED;
|
| + } else {
|
| + ticl_state_ = TRANSIENT_INVALIDATION_ERROR;
|
| + }
|
| + EmitStateChange();
|
| +}
|
| +
|
| +void SyncInvalidationListener::Acknowledge(
|
| + const invalidation::ObjectId& id,
|
| + const syncer::AckHandle& handle) {
|
| + UnackedInvalidationsMap::iterator lookup =
|
| + unacked_invalidations_map_.find(id);
|
| + if (lookup == unacked_invalidations_map_.end()) {
|
| + DLOG(WARNING) << "Received acknowledgement for untracked object ID";
|
| + return;
|
| + }
|
| + lookup->second.Acknowledge(handle);
|
| + invalidation_state_tracker_.Call(
|
| + FROM_HERE,
|
| + &InvalidationStateTracker::SetSavedInvalidations,
|
| + unacked_invalidations_map_);
|
| +}
|
| +
|
| +void SyncInvalidationListener::Drop(
|
| + const invalidation::ObjectId& id,
|
| + const syncer::AckHandle& handle) {
|
| + UnackedInvalidationsMap::iterator lookup =
|
| + unacked_invalidations_map_.find(id);
|
| + if (lookup == unacked_invalidations_map_.end()) {
|
| + DLOG(WARNING) << "Received drop for untracked object ID";
|
| + return;
|
| + }
|
| + lookup->second.Drop(handle);
|
| + invalidation_state_tracker_.Call(
|
| + FROM_HERE,
|
| + &InvalidationStateTracker::SetSavedInvalidations,
|
| + unacked_invalidations_map_);
|
| +}
|
| +
|
| +void SyncInvalidationListener::WriteState(const std::string& state) {
|
| + DCHECK(CalledOnValidThread());
|
| + DVLOG(1) << "WriteState";
|
| + invalidation_state_tracker_.Call(
|
| + FROM_HERE, &InvalidationStateTracker::SetBootstrapData, state);
|
| +}
|
| +
|
| +void SyncInvalidationListener::DoRegistrationUpdate() {
|
| + DCHECK(CalledOnValidThread());
|
| + const ObjectIdSet& unregistered_ids =
|
| + registration_manager_->UpdateRegisteredIds(registered_ids_);
|
| + for (ObjectIdSet::iterator it = unregistered_ids.begin();
|
| + it != unregistered_ids.end(); ++it) {
|
| + unacked_invalidations_map_.erase(*it);
|
| + }
|
| + invalidation_state_tracker_.Call(
|
| + FROM_HERE,
|
| + &InvalidationStateTracker::SetSavedInvalidations,
|
| + unacked_invalidations_map_);
|
| +
|
| + ObjectIdInvalidationMap object_id_invalidation_map;
|
| + for (UnackedInvalidationsMap::iterator map_it =
|
| + unacked_invalidations_map_.begin();
|
| + map_it != unacked_invalidations_map_.end(); ++map_it) {
|
| + if (registered_ids_.find(map_it->first) == registered_ids_.end()) {
|
| + continue;
|
| + }
|
| + map_it->second.ExportInvalidations(
|
| + GetThisAsAckHandler(),
|
| + &object_id_invalidation_map);
|
| + }
|
| +
|
| + // There's no need to run these through DispatchInvalidations(); they've
|
| + // already been saved to storage (that's where we found them) so all we need
|
| + // to do now is emit them.
|
| + EmitSavedInvalidations(object_id_invalidation_map);
|
| +}
|
| +
|
| +void SyncInvalidationListener::RequestDetailedStatus(
|
| + base::Callback<void(const base::DictionaryValue&)> callback) const {
|
| + DCHECK(CalledOnValidThread());
|
| + sync_network_channel_->RequestDetailedStatus(callback);
|
| + callback.Run(*CollectDebugData());
|
| +}
|
| +
|
| +scoped_ptr<base::DictionaryValue>
|
| +SyncInvalidationListener::CollectDebugData() const {
|
| + scoped_ptr<base::DictionaryValue> return_value(new base::DictionaryValue());
|
| + return_value->SetString(
|
| + "SyncInvalidationListener.PushClientState",
|
| + std::string(InvalidatorStateToString(push_client_state_)));
|
| + return_value->SetString("SyncInvalidationListener.TiclState",
|
| + std::string(InvalidatorStateToString(ticl_state_)));
|
| + scoped_ptr<base::DictionaryValue> unacked_map(new base::DictionaryValue());
|
| + for (UnackedInvalidationsMap::const_iterator it =
|
| + unacked_invalidations_map_.begin();
|
| + it != unacked_invalidations_map_.end();
|
| + ++it) {
|
| + unacked_map->Set((it->first).name(), (it->second).ToValue().release());
|
| + }
|
| + return_value->Set("SyncInvalidationListener.UnackedInvalidationsMap",
|
| + unacked_map.release());
|
| + return return_value.Pass();
|
| +}
|
| +
|
| +void SyncInvalidationListener::StopForTest() {
|
| + DCHECK(CalledOnValidThread());
|
| + Stop();
|
| +}
|
| +
|
| +void SyncInvalidationListener::Stop() {
|
| + DCHECK(CalledOnValidThread());
|
| + if (!invalidation_client_) {
|
| + return;
|
| + }
|
| +
|
| + registration_manager_.reset();
|
| + sync_system_resources_.Stop();
|
| + invalidation_client_->Stop();
|
| +
|
| + invalidation_client_.reset();
|
| + delegate_ = NULL;
|
| +
|
| + ticl_state_ = DEFAULT_INVALIDATION_ERROR;
|
| + push_client_state_ = DEFAULT_INVALIDATION_ERROR;
|
| +}
|
| +
|
| +InvalidatorState SyncInvalidationListener::GetState() const {
|
| + DCHECK(CalledOnValidThread());
|
| + if (ticl_state_ == INVALIDATION_CREDENTIALS_REJECTED ||
|
| + push_client_state_ == INVALIDATION_CREDENTIALS_REJECTED) {
|
| + // If either the ticl or the push client rejected our credentials,
|
| + // return INVALIDATION_CREDENTIALS_REJECTED.
|
| + return INVALIDATION_CREDENTIALS_REJECTED;
|
| + }
|
| + if (ticl_state_ == INVALIDATIONS_ENABLED &&
|
| + push_client_state_ == INVALIDATIONS_ENABLED) {
|
| + // If the ticl is ready and the push client notifications are
|
| + // enabled, return INVALIDATIONS_ENABLED.
|
| + return INVALIDATIONS_ENABLED;
|
| + }
|
| + // Otherwise, we have a transient error.
|
| + return TRANSIENT_INVALIDATION_ERROR;
|
| +}
|
| +
|
| +void SyncInvalidationListener::EmitStateChange() {
|
| + DCHECK(CalledOnValidThread());
|
| + delegate_->OnInvalidatorStateChange(GetState());
|
| +}
|
| +
|
| +WeakHandle<AckHandler> SyncInvalidationListener::GetThisAsAckHandler() {
|
| + DCHECK(CalledOnValidThread());
|
| + return WeakHandle<AckHandler>(weak_ptr_factory_.GetWeakPtr());
|
| +}
|
| +
|
| +void SyncInvalidationListener::OnNetworkChannelStateChanged(
|
| + InvalidatorState invalidator_state) {
|
| + DCHECK(CalledOnValidThread());
|
| + push_client_state_ = invalidator_state;
|
| + EmitStateChange();
|
| +}
|
| +
|
| +} // namespace syncer
|
|
|