| Index: sync/notifier/sync_invalidation_listener.cc
|
| diff --git a/sync/notifier/sync_invalidation_listener.cc b/sync/notifier/sync_invalidation_listener.cc
|
| index 763cc876bf0e954ca1e8a28c6f78c707215ce4ea..f95513111d42dce8df77f20fd8e048b19b9abe69 100644
|
| --- a/sync/notifier/sync_invalidation_listener.cc
|
| +++ b/sync/notifier/sync_invalidation_listener.cc
|
| @@ -16,14 +16,13 @@
|
| #include "google/cacheinvalidation/types.pb.h"
|
| #include "jingle/notifier/listener/push_client.h"
|
| #include "sync/notifier/invalidation_util.h"
|
| +#include "sync/notifier/object_id_invalidation_map.h"
|
| #include "sync/notifier/registration_manager.h"
|
|
|
| namespace {
|
|
|
| const char kApplicationName[] = "chrome-sync";
|
|
|
| -static const int64 kUnknownVersion = -1;
|
| -
|
| } // namespace
|
|
|
| namespace syncer {
|
| @@ -31,10 +30,8 @@ namespace syncer {
|
| SyncInvalidationListener::Delegate::~Delegate() {}
|
|
|
| SyncInvalidationListener::SyncInvalidationListener(
|
| - base::TickClock* tick_clock,
|
| scoped_ptr<notifier::PushClient> push_client)
|
| - : ack_tracker_(tick_clock, this),
|
| - push_client_(push_client.get()),
|
| + : push_client_(push_client.get()),
|
| sync_system_resources_(push_client.Pass(), this),
|
| delegate_(NULL),
|
| ticl_state_(DEFAULT_INVALIDATION_ERROR),
|
| @@ -56,7 +53,7 @@ void SyncInvalidationListener::Start(
|
| create_invalidation_client_callback,
|
| const std::string& client_id, const std::string& client_info,
|
| const std::string& invalidation_bootstrap_data,
|
| - const InvalidationStateMap& initial_invalidation_state_map,
|
| + const UnackedInvalidationsMap& initial_unacked_invalidations,
|
| const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker,
|
| Delegate* delegate) {
|
| DCHECK(CalledOnValidThread());
|
| @@ -71,18 +68,7 @@ void SyncInvalidationListener::Start(
|
| sync_system_resources_.storage()->SetInitialState(
|
| invalidation_bootstrap_data);
|
|
|
| - invalidation_state_map_ = initial_invalidation_state_map;
|
| - if (invalidation_state_map_.empty()) {
|
| - DVLOG(2) << "No initial max invalidation versions for any id";
|
| - } else {
|
| - for (InvalidationStateMap::const_iterator it =
|
| - invalidation_state_map_.begin();
|
| - it != invalidation_state_map_.end(); ++it) {
|
| - DVLOG(2) << "Initial max invalidation version for "
|
| - << ObjectIdToString(it->first) << " is "
|
| - << it->second.version;
|
| - }
|
| - }
|
| + unacked_invalidations_map_ = initial_unacked_invalidations;
|
| invalidation_state_tracker_ = invalidation_state_tracker;
|
| DCHECK(invalidation_state_tracker_.IsInitialized());
|
|
|
| @@ -103,19 +89,6 @@ void SyncInvalidationListener::Start(
|
|
|
| registration_manager_.reset(
|
| new RegistrationManager(invalidation_client_.get()));
|
| -
|
| - // Set up reminders for any invalidations that have not been locally
|
| - // acknowledged.
|
| - ObjectIdSet unacknowledged_ids;
|
| - for (InvalidationStateMap::const_iterator it =
|
| - invalidation_state_map_.begin();
|
| - it != invalidation_state_map_.end(); ++it) {
|
| - if (it->second.expected.Equals(it->second.current))
|
| - continue;
|
| - unacknowledged_ids.insert(it->first);
|
| - }
|
| - if (!unacknowledged_ids.empty())
|
| - ack_tracker_.Track(unacknowledged_ids);
|
| }
|
|
|
| void SyncInvalidationListener::UpdateCredentials(
|
| @@ -135,27 +108,6 @@ void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) {
|
| }
|
| }
|
|
|
| -void SyncInvalidationListener::Acknowledge(const invalidation::ObjectId& id,
|
| - const AckHandle& ack_handle) {
|
| - DCHECK(CalledOnValidThread());
|
| - InvalidationStateMap::iterator state_it = invalidation_state_map_.find(id);
|
| - if (state_it == invalidation_state_map_.end())
|
| - return;
|
| - invalidation_state_tracker_.Call(
|
| - FROM_HERE,
|
| - &InvalidationStateTracker::Acknowledge,
|
| - id,
|
| - ack_handle);
|
| - state_it->second.current = ack_handle;
|
| - if (state_it->second.expected.Equals(ack_handle)) {
|
| - // If the received ack matches the expected ack, then we no longer need to
|
| - // keep track of |id| since it is up-to-date.
|
| - ObjectIdSet ids;
|
| - ids.insert(id);
|
| - ack_tracker_.Ack(ids);
|
| - }
|
| -}
|
| -
|
| void SyncInvalidationListener::Ready(
|
| invalidation::InvalidationClient* client) {
|
| DCHECK(CalledOnValidThread());
|
| @@ -171,43 +123,24 @@ void SyncInvalidationListener::Invalidate(
|
| const invalidation::AckHandle& ack_handle) {
|
| DCHECK(CalledOnValidThread());
|
| DCHECK_EQ(client, invalidation_client_.get());
|
| - DVLOG(1) << "Invalidate: " << InvalidationToString(invalidation);
|
| + client->Acknowledge(ack_handle);
|
|
|
| const invalidation::ObjectId& id = invalidation.object_id();
|
|
|
| - // The invalidation API spec allows for the possibility of redundant
|
| - // invalidations, so keep track of the max versions and drop
|
| - // invalidations with old versions.
|
| - //
|
| - // TODO(akalin): Now that we keep track of registered ids, we
|
| - // should drop invalidations for unregistered ids. We may also
|
| - // have to filter it at a higher level, as invalidations for
|
| - // newly-unregistered ids may already be in flight.
|
| - InvalidationStateMap::const_iterator it = invalidation_state_map_.find(id);
|
| - if ((it != invalidation_state_map_.end()) &&
|
| - (invalidation.version() <= it->second.version)) {
|
| - // Drop redundant invalidations.
|
| - client->Acknowledge(ack_handle);
|
| - return;
|
| - }
|
| -
|
| std::string payload;
|
| // payload() CHECK()'s has_payload(), so we must check it ourselves first.
|
| if (invalidation.has_payload())
|
| payload = invalidation.payload();
|
|
|
| - DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id)
|
| - << " to " << invalidation.version();
|
| - invalidation_state_map_[id].version = invalidation.version();
|
| - invalidation_state_map_[id].payload = payload;
|
| - invalidation_state_tracker_.Call(
|
| - FROM_HERE,
|
| - &InvalidationStateTracker::SetMaxVersionAndPayload,
|
| - id, invalidation.version(), payload);
|
| + DVLOG(2) << "Received invalidation with version " << invalidation.version()
|
| + << " for " << ObjectIdToString(id);
|
| +
|
| + ObjectIdInvalidationMap invalidations;
|
| + Invalidation inv = Invalidation::Init(id, invalidation.version(), payload);
|
| + inv.set_ack_handler(GetThisAsAckHandler());
|
| + invalidations.Insert(inv);
|
|
|
| - ObjectIdSet ids;
|
| - ids.insert(id);
|
| - PrepareInvalidation(ids, invalidation.version(), payload, client, ack_handle);
|
| + DispatchInvalidations(invalidations);
|
| }
|
|
|
| void SyncInvalidationListener::InvalidateUnknownVersion(
|
| @@ -217,15 +150,14 @@ void SyncInvalidationListener::InvalidateUnknownVersion(
|
| DCHECK(CalledOnValidThread());
|
| DCHECK_EQ(client, invalidation_client_.get());
|
| DVLOG(1) << "InvalidateUnknownVersion";
|
| + client->Acknowledge(ack_handle);
|
| +
|
| + ObjectIdInvalidationMap invalidations;
|
| + Invalidation unknown_version = Invalidation::InitUnknownVersion(object_id);
|
| + unknown_version.set_ack_handler(GetThisAsAckHandler());
|
| + invalidations.Insert(unknown_version);
|
|
|
| - ObjectIdSet ids;
|
| - ids.insert(object_id);
|
| - PrepareInvalidation(
|
| - ids,
|
| - kUnknownVersion,
|
| - std::string(),
|
| - client,
|
| - ack_handle);
|
| + DispatchInvalidations(invalidations);
|
| }
|
|
|
| // This should behave as if we got an invalidation with version
|
| @@ -236,86 +168,56 @@ void SyncInvalidationListener::InvalidateAll(
|
| DCHECK(CalledOnValidThread());
|
| DCHECK_EQ(client, invalidation_client_.get());
|
| DVLOG(1) << "InvalidateAll";
|
| + client->Acknowledge(ack_handle);
|
| +
|
| + ObjectIdInvalidationMap invalidations;
|
| + for (ObjectIdSet::iterator it = registered_ids_.begin();
|
| + it != registered_ids_.end(); ++it) {
|
| + Invalidation unknown_version = Invalidation::InitUnknownVersion(*it);
|
| + unknown_version.set_ack_handler(GetThisAsAckHandler());
|
| + invalidations.Insert(unknown_version);
|
| + }
|
|
|
| - PrepareInvalidation(
|
| - registered_ids_,
|
| - kUnknownVersion,
|
| - std::string(),
|
| - client,
|
| - ack_handle);
|
| + DispatchInvalidations(invalidations);
|
| }
|
|
|
| -void SyncInvalidationListener::PrepareInvalidation(
|
| - const ObjectIdSet& ids,
|
| - int64 version,
|
| - const std::string& payload,
|
| - invalidation::InvalidationClient* client,
|
| - const invalidation::AckHandle& ack_handle) {
|
| +// If a handler is registered, emit right away. Otherwise, save it for later.
|
| +void SyncInvalidationListener::DispatchInvalidations(
|
| + const ObjectIdInvalidationMap& invalidations) {
|
| DCHECK(CalledOnValidThread());
|
|
|
| - // A server invalidation resets the local retry count.
|
| - ack_tracker_.Ack(ids);
|
| - invalidation_state_tracker_.Call(
|
| - FROM_HERE,
|
| - &InvalidationStateTracker::GenerateAckHandles,
|
| - ids,
|
| - base::MessageLoopProxy::current(),
|
| - base::Bind(&SyncInvalidationListener::EmitInvalidation,
|
| - weak_ptr_factory_.GetWeakPtr(),
|
| - ids,
|
| - version,
|
| - payload,
|
| - client,
|
| - ack_handle));
|
| -}
|
| + ObjectIdInvalidationMap to_save = invalidations;
|
| + ObjectIdInvalidationMap to_emit =
|
| + invalidations.GetSubsetWithObjectIds(registered_ids_);
|
|
|
| -void SyncInvalidationListener::EmitInvalidation(
|
| - const ObjectIdSet& ids,
|
| - int64 version,
|
| - const std::string& payload,
|
| - invalidation::InvalidationClient* client,
|
| - const invalidation::AckHandle& ack_handle,
|
| - const AckHandleMap& local_ack_handles) {
|
| - DCHECK(CalledOnValidThread());
|
| + SaveInvalidations(to_save);
|
| + EmitSavedInvalidations(to_emit);
|
| +}
|
|
|
| - ObjectIdInvalidationMap invalidation_map;
|
| - for (AckHandleMap::const_iterator it = local_ack_handles.begin();
|
| - it != local_ack_handles.end(); ++it) {
|
| - // Update in-memory copy of the invalidation state.
|
| - invalidation_state_map_[it->first].expected = it->second;
|
| -
|
| - if (version == kUnknownVersion) {
|
| - Invalidation inv = Invalidation::InitUnknownVersion(it->first);
|
| - inv.set_ack_handle(it->second);
|
| - invalidation_map.Insert(inv);
|
| - } else {
|
| - Invalidation inv = Invalidation::Init(it->first, version, payload);
|
| - inv.set_ack_handle(it->second);
|
| - invalidation_map.Insert(inv);
|
| +void SyncInvalidationListener::SaveInvalidations(
|
| + const ObjectIdInvalidationMap& to_save) {
|
| + ObjectIdSet objects_to_save = to_save.GetObjectIds();
|
| + for (ObjectIdSet::const_iterator it = objects_to_save.begin();
|
| + it != objects_to_save.end(); ++it) {
|
| + UnackedInvalidationsMap::iterator lookup =
|
| + unacked_invalidations_map_.find(*it);
|
| + if (lookup == unacked_invalidations_map_.end()) {
|
| + lookup = unacked_invalidations_map_.insert(
|
| + std::make_pair(*it, UnackedInvalidationSet(*it))).first;
|
| }
|
| + lookup->second.AddSet(to_save.ForObject(*it));
|
| }
|
| - ack_tracker_.Track(ids);
|
| - delegate_->OnInvalidate(invalidation_map);
|
| - client->Acknowledge(ack_handle);
|
| +
|
| + invalidation_state_tracker_.Call(
|
| + FROM_HERE,
|
| + &InvalidationStateTracker::SetSavedInvalidations,
|
| + unacked_invalidations_map_);
|
| }
|
|
|
| -void SyncInvalidationListener::OnTimeout(const ObjectIdSet& ids) {
|
| - ObjectIdInvalidationMap invalidation_map;
|
| - for (ObjectIdSet::const_iterator it = ids.begin(); it != ids.end(); ++it) {
|
| - if (invalidation_state_map_[*it].version == kUnknownVersion) {
|
| - Invalidation inv = Invalidation::InitUnknownVersion(*it);
|
| - inv.set_ack_handle(invalidation_state_map_[*it].expected);
|
| - invalidation_map.Insert(inv);
|
| - } else {
|
| - Invalidation inv = Invalidation::Init(
|
| - *it,
|
| - invalidation_state_map_[*it].version,
|
| - invalidation_state_map_[*it].payload);
|
| - inv.set_ack_handle(invalidation_state_map_[*it].expected);
|
| - invalidation_map.Insert(inv);
|
| - }
|
| - }
|
| - delegate_->OnInvalidate(invalidation_map);
|
| +void SyncInvalidationListener::EmitSavedInvalidations(
|
| + const ObjectIdInvalidationMap& to_emit) {
|
| + DVLOG(2) << "Emitting invalidations: " << to_emit.ToString();
|
| + delegate_->OnInvalidate(to_emit);
|
| }
|
|
|
| void SyncInvalidationListener::InformRegistrationStatus(
|
| @@ -388,6 +290,38 @@ void SyncInvalidationListener::InformError(
|
| EmitStateChange();
|
| }
|
|
|
| +void SyncInvalidationListener::Acknowledge(
|
| + const invalidation::ObjectId& id,
|
| + const syncer::AckHandle& handle) {
|
| + UnackedInvalidationsMap::iterator lookup =
|
| + unacked_invalidations_map_.find(id);
|
| + if (lookup == unacked_invalidations_map_.end()) {
|
| + DLOG(WARNING) << "Received acknowledgement for untracked object ID";
|
| + return;
|
| + }
|
| + lookup->second.Acknowledge(handle);
|
| + invalidation_state_tracker_.Call(
|
| + FROM_HERE,
|
| + &InvalidationStateTracker::SetSavedInvalidations,
|
| + unacked_invalidations_map_);
|
| +}
|
| +
|
| +void SyncInvalidationListener::Drop(
|
| + const invalidation::ObjectId& id,
|
| + const syncer::AckHandle& handle) {
|
| + UnackedInvalidationsMap::iterator lookup =
|
| + unacked_invalidations_map_.find(id);
|
| + if (lookup == unacked_invalidations_map_.end()) {
|
| + DLOG(WARNING) << "Received drop for untracked object ID";
|
| + return;
|
| + }
|
| + lookup->second.Drop(handle);
|
| + invalidation_state_tracker_.Call(
|
| + FROM_HERE,
|
| + &InvalidationStateTracker::SetSavedInvalidations,
|
| + unacked_invalidations_map_);
|
| +}
|
| +
|
| void SyncInvalidationListener::WriteState(const std::string& state) {
|
| DCHECK(CalledOnValidThread());
|
| DVLOG(1) << "WriteState";
|
| @@ -399,13 +333,31 @@ void SyncInvalidationListener::DoRegistrationUpdate() {
|
| DCHECK(CalledOnValidThread());
|
| const ObjectIdSet& unregistered_ids =
|
| registration_manager_->UpdateRegisteredIds(registered_ids_);
|
| - for (ObjectIdSet::const_iterator it = unregistered_ids.begin();
|
| + for (ObjectIdSet::iterator it = unregistered_ids.begin();
|
| it != unregistered_ids.end(); ++it) {
|
| - invalidation_state_map_.erase(*it);
|
| + unacked_invalidations_map_.erase(*it);
|
| }
|
| invalidation_state_tracker_.Call(
|
| - FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids);
|
| - ack_tracker_.Ack(unregistered_ids);
|
| + FROM_HERE,
|
| + &InvalidationStateTracker::SetSavedInvalidations,
|
| + unacked_invalidations_map_);
|
| +
|
| + ObjectIdInvalidationMap object_id_invalidation_map;
|
| + for (UnackedInvalidationsMap::iterator map_it =
|
| + unacked_invalidations_map_.begin();
|
| + map_it != unacked_invalidations_map_.end(); ++map_it) {
|
| + if (registered_ids_.find(map_it->first) == registered_ids_.end()) {
|
| + continue;
|
| + }
|
| + map_it->second.ExportInvalidations(
|
| + GetThisAsAckHandler(),
|
| + &object_id_invalidation_map);
|
| + }
|
| +
|
| + // There's no need to run these through DispatchInvalidations(); they've
|
| + // already been saved to storage (that's where we found them) so all we need
|
| + // to do now is emit them.
|
| + EmitSavedInvalidations(object_id_invalidation_map);
|
| }
|
|
|
| void SyncInvalidationListener::StopForTest() {
|
| @@ -413,23 +365,12 @@ void SyncInvalidationListener::StopForTest() {
|
| Stop();
|
| }
|
|
|
| -InvalidationStateMap SyncInvalidationListener::GetStateMapForTest() const {
|
| - DCHECK(CalledOnValidThread());
|
| - return invalidation_state_map_;
|
| -}
|
| -
|
| -AckTracker* SyncInvalidationListener::GetAckTrackerForTest() {
|
| - return &ack_tracker_;
|
| -}
|
| -
|
| void SyncInvalidationListener::Stop() {
|
| DCHECK(CalledOnValidThread());
|
| if (!invalidation_client_) {
|
| return;
|
| }
|
|
|
| - ack_tracker_.Clear();
|
| -
|
| registration_manager_.reset();
|
| sync_system_resources_.Stop();
|
| invalidation_client_->Stop();
|
| @@ -437,8 +378,6 @@ void SyncInvalidationListener::Stop() {
|
| invalidation_client_.reset();
|
| delegate_ = NULL;
|
|
|
| - invalidation_state_tracker_.Reset();
|
| - invalidation_state_map_.clear();
|
| ticl_state_ = DEFAULT_INVALIDATION_ERROR;
|
| push_client_state_ = DEFAULT_INVALIDATION_ERROR;
|
| }
|
| @@ -466,6 +405,11 @@ void SyncInvalidationListener::EmitStateChange() {
|
| delegate_->OnInvalidatorStateChange(GetState());
|
| }
|
|
|
| +WeakHandle<AckHandler> SyncInvalidationListener::GetThisAsAckHandler() {
|
| + DCHECK(CalledOnValidThread());
|
| + return WeakHandle<AckHandler>(weak_ptr_factory_.GetWeakPtr());
|
| +}
|
| +
|
| void SyncInvalidationListener::OnNotificationsEnabled() {
|
| DCHECK(CalledOnValidThread());
|
| push_client_state_ = INVALIDATIONS_ENABLED;
|
|
|